Dagster 数据编排完全指南¶
为什么要学 Dagster¶
Software-Defined Assets(SDA)革新数据编排范式:Dagster 的核心抽象不是"任务"(做什么),而是"资产"(产出什么)。你定义的是数据资产(表、文件、模型),Dagster 自动推导出执行计划。这让数据血缘(lineage)成为一等公民。
开发体验极佳:Dagster UI(Dagit)是数据编排领域最好的 UI。资产图谱可视化、运行时间线、传感器监控、日志聚合,一目了然。本地开发时
dagster dev即可启动完整环境。可测试性设计:Dagster 从第一天就为可测试性设计。I/O Manager 抽象让你可以在测试中替换数据源/目标,Resource 系统支持依赖注入。数据管道可以像应用代码一样写单元测试。
类型系统与数据质量:内置 Asset Checks 让你在资产生成后自动验证数据质量。配合 Dagster 的类型系统,可以在管道级别保证数据契约。
与 Airflow 相比的优势:不需要维护复杂的基础设施(Scheduler+Webserver+Worker+DB),不需要与 XCom 搏斗,不需要写样板代码。Dagster 甚至提供从 Airflow 的迁移工具。
核心概念详解¶
Dagster 是什么(白话解释)¶
传统的工作流编排(如 Airflow)关注的是"做什么"(任务):先运行 Task A,再运行 Task B,然后运行 Task C。
Dagster 换了个角度,关注"产出什么"(资产):我有一个"原始数据"资产,一个"清洗后数据"资产,一个"分析报告"资产。Dagster 自动知道它们之间的依赖关系,按正确顺序执行。
好处是什么?当你看 Dagster 的资产图谱时,看到的是你的数据全貌——哪些表依赖哪些表,数据从哪里来到哪里去。而不是一堆抽象的任务节点。
核心概念¶
| 概念 | 说明 |
|---|---|
| Asset | 数据资产(表、文件、模型等持久化的数据产物) |
| Op | 操作(计算单元,类似 Airflow 的 Operator) |
| Job | 一组要执行的 ops 或 assets |
| Schedule | 基于时间的调度 |
| Sensor | 基于事件的触发(文件到达、API 变化等) |
| Resource | 外部系统连接(数据库、API、存储等) |
| I/O Manager | 控制数据如何读写(存储抽象层) |
| Partition | 数据分区(按时间、类别等) |
| Asset Check | 资产质量检查 |
| Definition | 代码位置(一组 assets、jobs、schedules 的集合) |
Dagster vs Airflow 对比¶
| 特性 | Dagster | Airflow |
|---|---|---|
| 核心抽象 | Asset(资产优先) | Task/DAG(任务优先) |
| 数据血缘 | 核心特性(自动推导) | 需要额外工具 |
| 本地开发 | dagster dev | 需要完整部署 |
| 测试 | 原生支持(DI/Resource) | 困难 |
| UI | 现代化(资产图谱) | 传统(DAG视图) |
| 参数传递 | Python 原生 | XCom(JSON序列化) |
| 数据质量 | Asset Checks | 需第三方(Great Expectations) |
| 分区 | 内置(时间/静态/动态) | 手动实现 |
| 部署 | 简单(单进程可运行) | 复杂(多组件) |
| 学习曲线 | 中(概念较多但清晰) | 中高 |
| 社区规模 | 增长中 | 最大 |
| Python版本 | 3.8+ | 3.8+ |
安装与配置¶
# 安装 Dagster
pip install dagster dagster-webserver
# 常用集成
pip install dagster-duckdb dagster-dbt dagster-pandas dagster-polars
pip install dagster-postgres dagster-aws dagster-gcp
# 创建新项目
dagster project scaffold --name my-data-project
cd my-data-project
pip install -e ".[dev]"
# 启动开发服务器
dagster dev
# 打开 http://127.0.0.1:3000
项目结构¶
my_data_project/
├── my_data_project/
│ ├── __init__.py
│ ├── definitions.py # 主定义文件
│ ├── assets/
│ │ ├── __init__.py
│ │ ├── raw.py # 原始数据资产
│ │ ├── staging.py # 暂存层资产
│ │ └── marts.py # 数据集市资产
│ ├── resources/
│ │ └── __init__.py # 资源定义
│ ├── sensors/
│ │ └── __init__.py
│ └── schedules/
│ └── __init__.py
├── tests/
│ ├── test_assets.py
│ └── test_resources.py
├── pyproject.toml
└── setup.py
快速上手:5 分钟最小示例¶
my_project/definitions.py:
import dagster as dg
import pandas as pd
@dg.asset(
description="从 CSV 加载原始订单数据",
group_name="raw",
)
def raw_orders() -> pd.DataFrame:
"""原始订单数据"""
return pd.DataFrame({
"order_id": [1, 2, 3, 4, 5],
"product": ["苹果", "香蕉", "苹果", "橙子", "香蕉"],
"amount": [100, 80, 120, 90, 110],
"date": pd.to_datetime(["2024-12-01", "2024-12-01", "2024-12-02", "2024-12-02", "2024-12-03"]),
})
@dg.asset(
description="按日期聚合的订单统计",
group_name="analytics",
)
def daily_stats(raw_orders: pd.DataFrame) -> pd.DataFrame:
"""依赖 raw_orders 资产"""
return raw_orders.groupby("date").agg(
total_revenue=("amount", "sum"),
order_count=("order_id", "count"),
avg_order=("amount", "mean"),
).reset_index()
@dg.asset(
description="按产品聚合的销售排名",
group_name="analytics",
)
def product_ranking(raw_orders: pd.DataFrame) -> pd.DataFrame:
"""依赖 raw_orders 资产"""
return (raw_orders
.groupby("product")
.agg(total_sales=("amount", "sum"), count=("order_id", "count"))
.sort_values("total_sales", ascending=False)
.reset_index())
defs = dg.Definitions(
assets=[raw_orders, daily_stats, product_ranking],
)
运行:
进阶用法¶
场景一:I/O Manager(数据存储抽象)¶
import dagster as dg
from dagster_duckdb_pandas import DuckDBPandasIOManager
import pandas as pd
# 定义资源
duckdb_io = DuckDBPandasIOManager(database="analytics.duckdb")
@dg.asset(group_name="raw")
def customers() -> pd.DataFrame:
return pd.read_csv("data/customers.csv")
@dg.asset(group_name="raw")
def orders() -> pd.DataFrame:
return pd.read_csv("data/orders.csv")
@dg.asset(group_name="analytics")
def customer_orders(customers: pd.DataFrame, orders: pd.DataFrame) -> pd.DataFrame:
return customers.merge(orders, on="customer_id")
defs = dg.Definitions(
assets=[customers, orders, customer_orders],
resources={
"io_manager": duckdb_io, # 所有资产自动存储到 DuckDB
},
)
场景二:分区资产(Partitioned Assets)¶
import dagster as dg
import pandas as pd
# 按日分区
daily_partition = dg.DailyPartitionsDefinition(start_date="2024-01-01")
@dg.asset(
partitions_def=daily_partition,
group_name="raw",
)
def daily_events(context: dg.AssetExecutionContext) -> pd.DataFrame:
"""按天获取事件数据"""
date = context.partition_key # "2024-12-01"
context.log.info(f"获取 {date} 的数据")
# 从 API 获取当天数据
events = fetch_events_for_date(date)
return pd.DataFrame(events)
@dg.asset(
partitions_def=daily_partition,
group_name="analytics",
)
def daily_metrics(context: dg.AssetExecutionContext, daily_events: pd.DataFrame) -> pd.DataFrame:
"""计算每日指标"""
return daily_events.groupby("event_type").agg(count=("id", "count")).reset_index()
场景三:Sensor(事件驱动触发)¶
import dagster as dg
import os
@dg.sensor(
job=dg.define_asset_job("process_new_files", selection=[raw_data, processed_data]),
minimum_interval_seconds=30,
)
def new_file_sensor(context: dg.SensorEvaluationContext):
"""监控目录,有新文件就触发处理"""
watch_dir = "/data/incoming/"
last_mtime = float(context.cursor or 0)
new_files = []
max_mtime = last_mtime
for fname in os.listdir(watch_dir):
fpath = os.path.join(watch_dir, fname)
mtime = os.path.getmtime(fpath)
if mtime > last_mtime:
new_files.append(fname)
max_mtime = max(max_mtime, mtime)
if new_files:
context.update_cursor(str(max_mtime))
yield dg.RunRequest(
run_key=f"files-{max_mtime}",
run_config={"ops": {"raw_data": {"config": {"files": new_files}}}},
)
场景四:Schedule(定时调度)¶
import dagster as dg
# 定义要调度的 Job
daily_job = dg.define_asset_job(
name="daily_analytics",
selection=dg.AssetSelection.groups("analytics"),
partitions_def=dg.DailyPartitionsDefinition(start_date="2024-01-01"),
)
# Cron 调度
daily_schedule = dg.ScheduleDefinition(
job=daily_job,
cron_schedule="0 6 * * *", # 每天早上6点
default_status=dg.DefaultScheduleStatus.RUNNING,
)
defs = dg.Definitions(
assets=[...],
schedules=[daily_schedule],
)
场景五:Asset Checks(数据质量检查)¶
import dagster as dg
import pandas as pd
@dg.asset
def orders() -> pd.DataFrame:
return pd.read_csv("orders.csv")
@dg.asset_check(asset=orders, description="订单金额不能为负")
def no_negative_amounts(orders: pd.DataFrame):
negative_count = (orders["amount"] < 0).sum()
return dg.AssetCheckResult(
passed=negative_count == 0,
metadata={"negative_count": int(negative_count)},
)
@dg.asset_check(asset=orders, description="没有重复订单ID")
def unique_order_ids(orders: pd.DataFrame):
duplicates = orders["order_id"].duplicated().sum()
return dg.AssetCheckResult(
passed=duplicates == 0,
metadata={"duplicate_count": int(duplicates)},
)
@dg.asset_check(asset=orders, description="订单数据时效性")
def data_freshness(orders: pd.DataFrame):
latest = pd.to_datetime(orders["date"]).max()
hours_old = (pd.Timestamp.now() - latest).total_seconds() / 3600
return dg.AssetCheckResult(
passed=hours_old < 24,
metadata={"hours_since_latest": round(hours_old, 1)},
)
场景六:dbt 集成¶
from dagster_dbt import DbtCliResource, dbt_assets, DbtProject
dbt_project = DbtProject(project_dir="dbt_project/")
@dbt_assets(manifest=dbt_project.manifest_path)
def my_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
defs = dg.Definitions(
assets=[my_dbt_assets],
resources={"dbt": DbtCliResource(project_dir=dbt_project)},
)
场景七:Resource 依赖注入¶
import dagster as dg
from dagster import ConfigurableResource
class DatabaseResource(ConfigurableResource):
host: str
port: int = 5432
database: str
user: str
password: str
def query(self, sql: str):
import psycopg2
conn = psycopg2.connect(
host=self.host, port=self.port,
database=self.database,
user=self.user, password=self.password,
)
return pd.read_sql(sql, conn)
@dg.asset
def user_stats(db: DatabaseResource) -> pd.DataFrame:
return db.query("SELECT * FROM user_stats")
# 生产配置
prod_defs = dg.Definitions(
assets=[user_stats],
resources={"db": DatabaseResource(
host="prod-db.example.com",
database="analytics",
user=dg.EnvVar("DB_USER"),
password=dg.EnvVar("DB_PASSWORD"),
)},
)
常见问题与排错¶
问题一:Asset 依赖不正确¶
资产之间的依赖通过函数参数名推导。参数名必须与上游资产名匹配:
@dg.asset
def raw_data(): # 资产名 = "raw_data"
return ...
@dg.asset
def processed(raw_data): # 参数名 "raw_data" → 依赖上游 "raw_data"
return ...
问题二:I/O Manager 与自定义存储¶
# 如果不想用 I/O Manager,可以返回 None 并自己处理存储
@dg.asset(io_manager_key="no_io")
def custom_storage_asset():
df = process_data()
df.to_parquet("s3://bucket/output.parquet")
# 不返回值,让 Dagster 知道资产已物化
问题三:开发 vs 生产资源切换¶
# 使用不同的 Definitions 实例
dev_defs = dg.Definitions(
assets=all_assets,
resources={"db": DatabaseResource(host="localhost", ...)},
)
prod_defs = dg.Definitions(
assets=all_assets,
resources={"db": DatabaseResource(host="prod-server", ...)},
)
问题四:性能优化 - 大量小资产¶
# 使用 multi_asset 减少开销
@dg.multi_asset(
outs={
"users": dg.AssetOut(),
"orders": dg.AssetOut(),
"products": dg.AssetOut(),
}
)
def extract_all():
return pd.read_csv("users.csv"), pd.read_csv("orders.csv"), pd.read_csv("products.csv")
问题五:如何调试¶
# 在测试中执行资产
def test_daily_stats():
result = dg.materialize([raw_orders, daily_stats])
assert result.success
df = result.output_for_node("daily_stats")
assert len(df) > 0
参考资源¶
- 官方文档:https://docs.dagster.io/
- GitHub:https://github.com/dagster-io/dagster
- Dagster University:https://courses.dagster.io/
- Dagster Blog:https://dagster.io/blog
- Slack 社区:https://dagster.io/slack
- Dagster Cloud:https://dagster.cloud/
- Dagster + dbt 指南:https://docs.dagster.io/integrations/dbt
- 从 Airflow 迁移:https://docs.dagster.io/integrations/airflow