Dagster 数据管道
一句话概述:Dagster 是以"数据资产"为中心的编排平台,不是告诉系统"做什么步骤",而是声明"我想要什么数据",让系统自动推断执行逻辑。
核心知识点表
| 概念 | 白话解释 |
|---|
| Asset | 数据资产,就是你想要的最终产物(一张表、一个模型、一个报告) |
| Op | 操作,一个具体的计算步骤(类似Airflow的Task) |
| Job | 把多个Op或Asset组合成一个可执行的"任务包" |
| Resource | 外部资源的连接(数据库、API等),类似Airflow的Connection |
| IO Manager | 管理数据怎么存、怎么读的组件(存到文件?数据库?) |
| Sensor | 传感器,监听外部事件并触发Job |
| Schedule | 定时调度,按Cron表达式触发 |
| Partition | 分区,把大任务按时间或其他维度拆分(比如按天处理) |
| Dagster+ | Dagster的云服务(商业版) |
版本信息(2026年5月)
- 最新版本:Dagster 1.13.4
- Python支持:3.9 - 3.14
- 亮点:Dagster Skills、分区资产检查、虚拟资产
安装配置
# 创建虚拟环境
conda create -n dagster python=3.12 -y # 创建Python 3.12环境
conda activate dagster # 激活
# 安装Dagster(核心 + Web UI + 常用集成)
pip install dagster dagster-webserver # 核心包 + Web界面
# 创建新项目(脚手架)
dagster project scaffold --name my_project # 自动生成项目结构
cd my_project # 进入项目
# 安装项目依赖
pip install -e ".[dev]" # 以开发模式安装
# 启动开发服务器
dagster dev # 启动后访问 http://localhost:3000
项目结构
my_project/
├── my_project/
│ ├── __init__.py # 项目入口,注册资产和资源
│ ├── assets/ # 数据资产定义
│ │ └── __init__.py
│ ├── resources/ # 资源配置(数据库连接等)
│ └── jobs/ # Job定义
├── tests/ # 测试
├── setup.py # 包配置
└── pyproject.toml # 项目配置
基本使用
定义数据资产
# my_project/assets/__init__.py
from dagster import asset, AssetExecutionContext # 导入资产装饰器
import pandas as pd # 导入pandas
@asset(
group_name="raw_data", # 资产分组
description="从CSV读取原始用户数据", # 描述(会显示在UI上)
)
def raw_users(context: AssetExecutionContext) -> pd.DataFrame:
"""读取原始用户数据"""
df = pd.read_csv("data/users.csv") # 读取CSV文件
context.log.info(f"读取了 {len(df)} 条用户记录") # 记录日志
return df # 返回DataFrame,Dagster自动存储
@asset(
group_name="transformed",
description="清洗后的用户数据",
)
def clean_users(raw_users: pd.DataFrame) -> pd.DataFrame:
"""清洗用户数据(自动依赖raw_users)"""
# 参数名 raw_users 匹配上面的资产名,Dagster自动建立依赖
df = raw_users.dropna() # 删除空值
df = df.drop_duplicates(subset=["email"]) # 按邮箱去重
return df
@asset(
group_name="analytics",
description="用户统计报表",
)
def user_stats(clean_users: pd.DataFrame) -> dict:
"""生成用户统计"""
stats = {
"total_users": len(clean_users), # 总用户数
"avg_age": float(clean_users["age"].mean()), # 平均年龄
}
return stats
注册资产到项目
# my_project/__init__.py
from dagster import Definitions, load_assets_from_modules # 导入工具
from my_project import assets # 导入资产模块
# 加载所有资产并注册
defs = Definitions(
assets=load_assets_from_modules([assets]), # 自动扫描assets模块
)
定义Job和Schedule
from dagster import (
define_asset_job, # 基于资产定义Job
ScheduleDefinition, # 定义调度
AssetSelection, # 选择哪些资产
)
# 定义一个Job:物化(生成)所有analytics组的资产
analytics_job = define_asset_job(
name="refresh_analytics", # Job名称
selection=AssetSelection.groups("analytics"), # 选择analytics组
)
# 定义调度:每天凌晨2点刷新
daily_schedule = ScheduleDefinition(
job=analytics_job, # 关联的Job
cron_schedule="0 2 * * *", # Cron表达式
)
高级用法
分区资产
from dagster import asset, DailyPartitionsDefinition # 按天分区
# 定义按天分区
daily_partitions = DailyPartitionsDefinition(
start_date="2026-01-01", # 分区开始日期
)
@asset(
partitions_def=daily_partitions, # 应用分区定义
description="每日销售数据",
)
def daily_sales(context: AssetExecutionContext) -> pd.DataFrame:
"""按天处理销售数据"""
partition_date = context.partition_key # 获取当前分区日期,如"2026-05-13"
context.log.info(f"处理 {partition_date} 的数据")
# 只查询当天的数据
df = pd.read_sql(
f"SELECT * FROM sales WHERE date = '{partition_date}'",
con=db_connection,
)
return df
Resource资源管理
from dagster import resource, asset, Definitions, ConfigurableResource
# 定义可配置的资源
class DatabaseResource(ConfigurableResource):
"""数据库资源"""
host: str # 数据库地址
port: int = 5432 # 端口,默认5432
database: str # 数据库名
def query(self, sql: str):
"""执行SQL查询"""
import psycopg2
conn = psycopg2.connect(
host=self.host,
port=self.port,
database=self.database,
)
return pd.read_sql(sql, conn)
@asset
def sales_data(db: DatabaseResource) -> pd.DataFrame:
"""用数据库资源查询销售数据"""
return db.query("SELECT * FROM sales LIMIT 1000")
# 注册资源
defs = Definitions(
assets=[sales_data],
resources={
"db": DatabaseResource( # 注入数据库资源
host="localhost",
database="mydb",
),
},
)
Sensor事件驱动
from dagster import sensor, RunRequest, SensorEvaluationContext
import os
@sensor(
job=analytics_job, # 触发哪个Job
minimum_interval_seconds=30, # 最小检查间隔30秒
)
def new_file_sensor(context: SensorEvaluationContext):
"""监听新文件到达"""
data_dir = "/data/incoming"
for filename in os.listdir(data_dir):
filepath = os.path.join(data_dir, filename)
if filename.endswith(".csv"):
yield RunRequest( # 发现新文件就触发一次Job
run_key=filename, # 用文件名做去重键,同一文件不会触发两次
run_config={"filename": filepath},
)
测试资产
# tests/test_assets.py
import pandas as pd
from my_project.assets import raw_users, clean_users, user_stats
def test_clean_users():
"""测试数据清洗逻辑"""
# 构造测试数据(有空值和重复)
test_data = pd.DataFrame({
"name": ["Alice", "Bob", None, "Alice"],
"email": ["a@test.com", "b@test.com", "c@test.com", "a@test.com"],
"age": [25, 30, None, 25],
})
result = clean_users(test_data) # 直接调用函数测试
assert len(result) <= len(test_data) # 清洗后不应更多
assert result["email"].is_unique # 邮箱应该唯一
assert result.isna().sum().sum() == 0 # 没有空值
常见报错与解决
| 报错信息 | 原因 | 解决方案 |
|---|
DagsterInvalidDefinitionError | 资产定义有误(参数名和资产名不匹配) | 检查函数参数名是否和依赖的资产名一致 |
No module named dagster | 没安装或环境没激活 | conda activate dagster && pip install dagster |
Asset key not found | 引用了不存在的资产 | 检查资产是否在Definitions中注册 |
dagster dev 启动失败 | 端口被占用 | dagster dev -p 3001 换端口 |
IOManager error | 数据存储配置有问题 | 检查IO Manager配置,默认用文件系统 |
Partition key invalid | 分区键格式不对 | 确保日期格式匹配分区定义 |
速查表
# ===== CLI命令 =====
dagster dev # 启动开发服务器
dagster dev -p 3001 # 指定端口
dagster project scaffold --name X # 创建新项目
dagster asset materialize -m X # 物化资产
dagster job execute -m X -j Y # 执行Job
dagster instance info # 显示实例信息
# ===== 常用装饰器 =====
# @asset 基本资产
# @asset(group_name="X") 资产分组
# @asset(partitions_def=X) 分区资产
# @asset(deps=["other_asset"]) 显式依赖
# @asset(key_prefix="schema") 资产键前缀
# @multi_asset 一个函数产出多个资产
# @op 基本操作
# @job 定义Job
# @sensor 事件传感器
# @schedule 定时调度
同类工具对比
| 特性 | Dagster | Airflow | Prefect |
|---|
| 核心理念 | 数据资产为中心 | 任务编排为中心 | Python函数为中心 |
| 数据血缘 | 原生自动生成 | 需额外配置 | 有限支持 |
| 测试友好度 | ★★★★★(最好) | ★★★ | ★★★★ |
| 数据质量检查 | 内置Asset Check | 需插件 | 需手写 |
| 适合场景 | 数据质量优先的团队 | 大型企业级流程 | 快速上手的小团队 |
面试建议:Dagster最大的差异化卖点是"Asset-centric"(资产为中心),而不是"Task-centric"(任务为中心)。面试时可以说:"Dagster关注的是数据产物本身,而不是产生数据的步骤,这让数据血缘和质量检查变得很自然。"