跳转至

Dagster 数据编排完全指南

为什么要学 Dagster

  1. Software-Defined Assets(SDA)革新数据编排范式:Dagster 的核心抽象不是"任务"(做什么),而是"资产"(产出什么)。你定义的是数据资产(表、文件、模型),Dagster 自动推导出执行计划。这让数据血缘(lineage)成为一等公民。

  2. 开发体验极佳:Dagster UI(Dagit)是数据编排领域最好的 UI。资产图谱可视化、运行时间线、传感器监控、日志聚合,一目了然。本地开发时 dagster dev 即可启动完整环境。

  3. 可测试性设计:Dagster 从第一天就为可测试性设计。I/O Manager 抽象让你可以在测试中替换数据源/目标,Resource 系统支持依赖注入。数据管道可以像应用代码一样写单元测试。

  4. 类型系统与数据质量:内置 Asset Checks 让你在资产生成后自动验证数据质量。配合 Dagster 的类型系统,可以在管道级别保证数据契约。

  5. 与 Airflow 相比的优势:不需要维护复杂的基础设施(Scheduler+Webserver+Worker+DB),不需要与 XCom 搏斗,不需要写样板代码。Dagster 甚至提供从 Airflow 的迁移工具。


核心概念详解

Dagster 是什么(白话解释)

传统的工作流编排(如 Airflow)关注的是"做什么"(任务):先运行 Task A,再运行 Task B,然后运行 Task C。

Dagster 换了个角度,关注"产出什么"(资产):我有一个"原始数据"资产,一个"清洗后数据"资产,一个"分析报告"资产。Dagster 自动知道它们之间的依赖关系,按正确顺序执行。

好处是什么?当你看 Dagster 的资产图谱时,看到的是你的数据全貌——哪些表依赖哪些表,数据从哪里来到哪里去。而不是一堆抽象的任务节点。

核心概念

概念说明
Asset数据资产(表、文件、模型等持久化的数据产物)
Op操作(计算单元,类似 Airflow 的 Operator)
Job一组要执行的 ops 或 assets
Schedule基于时间的调度
Sensor基于事件的触发(文件到达、API 变化等)
Resource外部系统连接(数据库、API、存储等)
I/O Manager控制数据如何读写(存储抽象层)
Partition数据分区(按时间、类别等)
Asset Check资产质量检查
Definition代码位置(一组 assets、jobs、schedules 的集合)

Dagster vs Airflow 对比

特性DagsterAirflow
核心抽象Asset(资产优先)Task/DAG(任务优先)
数据血缘核心特性(自动推导)需要额外工具
本地开发dagster dev需要完整部署
测试原生支持(DI/Resource)困难
UI现代化(资产图谱)传统(DAG视图)
参数传递Python 原生XCom(JSON序列化)
数据质量Asset Checks需第三方(Great Expectations)
分区内置(时间/静态/动态)手动实现
部署简单(单进程可运行)复杂(多组件)
学习曲线中(概念较多但清晰)中高
社区规模增长中最大
Python版本3.8+3.8+

安装与配置

# 安装 Dagster
pip install dagster dagster-webserver

# 常用集成
pip install dagster-duckdb dagster-dbt dagster-pandas dagster-polars
pip install dagster-postgres dagster-aws dagster-gcp

# 创建新项目
dagster project scaffold --name my-data-project
cd my-data-project
pip install -e ".[dev]"

# 启动开发服务器
dagster dev
# 打开 http://127.0.0.1:3000

项目结构

my_data_project/
├── my_data_project/
│   ├── __init__.py
│   ├── definitions.py     # 主定义文件
│   ├── assets/
│   │   ├── __init__.py
│   │   ├── raw.py          # 原始数据资产
│   │   ├── staging.py      # 暂存层资产
│   │   └── marts.py        # 数据集市资产
│   ├── resources/
│   │   └── __init__.py     # 资源定义
│   ├── sensors/
│   │   └── __init__.py
│   └── schedules/
│       └── __init__.py
├── tests/
│   ├── test_assets.py
│   └── test_resources.py
├── pyproject.toml
└── setup.py

快速上手:5 分钟最小示例

my_project/definitions.py

import dagster as dg
import pandas as pd

@dg.asset(
    description="从 CSV 加载原始订单数据",
    group_name="raw",
)
def raw_orders() -> pd.DataFrame:
    """原始订单数据"""
    return pd.DataFrame({
        "order_id": [1, 2, 3, 4, 5],
        "product": ["苹果", "香蕉", "苹果", "橙子", "香蕉"],
        "amount": [100, 80, 120, 90, 110],
        "date": pd.to_datetime(["2024-12-01", "2024-12-01", "2024-12-02", "2024-12-02", "2024-12-03"]),
    })

@dg.asset(
    description="按日期聚合的订单统计",
    group_name="analytics",
)
def daily_stats(raw_orders: pd.DataFrame) -> pd.DataFrame:
    """依赖 raw_orders 资产"""
    return raw_orders.groupby("date").agg(
        total_revenue=("amount", "sum"),
        order_count=("order_id", "count"),
        avg_order=("amount", "mean"),
    ).reset_index()

@dg.asset(
    description="按产品聚合的销售排名",
    group_name="analytics",
)
def product_ranking(raw_orders: pd.DataFrame) -> pd.DataFrame:
    """依赖 raw_orders 资产"""
    return (raw_orders
        .groupby("product")
        .agg(total_sales=("amount", "sum"), count=("order_id", "count"))
        .sort_values("total_sales", ascending=False)
        .reset_index())

defs = dg.Definitions(
    assets=[raw_orders, daily_stats, product_ranking],
)

运行:

dagster dev -f definitions.py
# 打开 http://127.0.0.1:3000
# 在 UI 中可以看到资产图谱,点击 Materialize 执行


进阶用法

场景一:I/O Manager(数据存储抽象)

import dagster as dg
from dagster_duckdb_pandas import DuckDBPandasIOManager
import pandas as pd

# 定义资源
duckdb_io = DuckDBPandasIOManager(database="analytics.duckdb")

@dg.asset(group_name="raw")
def customers() -> pd.DataFrame:
    return pd.read_csv("data/customers.csv")

@dg.asset(group_name="raw")
def orders() -> pd.DataFrame:
    return pd.read_csv("data/orders.csv")

@dg.asset(group_name="analytics")
def customer_orders(customers: pd.DataFrame, orders: pd.DataFrame) -> pd.DataFrame:
    return customers.merge(orders, on="customer_id")

defs = dg.Definitions(
    assets=[customers, orders, customer_orders],
    resources={
        "io_manager": duckdb_io,  # 所有资产自动存储到 DuckDB
    },
)

场景二:分区资产(Partitioned Assets)

import dagster as dg
import pandas as pd

# 按日分区
daily_partition = dg.DailyPartitionsDefinition(start_date="2024-01-01")

@dg.asset(
    partitions_def=daily_partition,
    group_name="raw",
)
def daily_events(context: dg.AssetExecutionContext) -> pd.DataFrame:
    """按天获取事件数据"""
    date = context.partition_key  # "2024-12-01"
    context.log.info(f"获取 {date} 的数据")

    # 从 API 获取当天数据
    events = fetch_events_for_date(date)
    return pd.DataFrame(events)

@dg.asset(
    partitions_def=daily_partition,
    group_name="analytics",
)
def daily_metrics(context: dg.AssetExecutionContext, daily_events: pd.DataFrame) -> pd.DataFrame:
    """计算每日指标"""
    return daily_events.groupby("event_type").agg(count=("id", "count")).reset_index()

场景三:Sensor(事件驱动触发)

import dagster as dg
import os

@dg.sensor(
    job=dg.define_asset_job("process_new_files", selection=[raw_data, processed_data]),
    minimum_interval_seconds=30,
)
def new_file_sensor(context: dg.SensorEvaluationContext):
    """监控目录,有新文件就触发处理"""
    watch_dir = "/data/incoming/"
    last_mtime = float(context.cursor or 0)

    new_files = []
    max_mtime = last_mtime

    for fname in os.listdir(watch_dir):
        fpath = os.path.join(watch_dir, fname)
        mtime = os.path.getmtime(fpath)
        if mtime > last_mtime:
            new_files.append(fname)
            max_mtime = max(max_mtime, mtime)

    if new_files:
        context.update_cursor(str(max_mtime))
        yield dg.RunRequest(
            run_key=f"files-{max_mtime}",
            run_config={"ops": {"raw_data": {"config": {"files": new_files}}}},
        )

场景四:Schedule(定时调度)

import dagster as dg

# 定义要调度的 Job
daily_job = dg.define_asset_job(
    name="daily_analytics",
    selection=dg.AssetSelection.groups("analytics"),
    partitions_def=dg.DailyPartitionsDefinition(start_date="2024-01-01"),
)

# Cron 调度
daily_schedule = dg.ScheduleDefinition(
    job=daily_job,
    cron_schedule="0 6 * * *",  # 每天早上6点
    default_status=dg.DefaultScheduleStatus.RUNNING,
)

defs = dg.Definitions(
    assets=[...],
    schedules=[daily_schedule],
)

场景五:Asset Checks(数据质量检查)

import dagster as dg
import pandas as pd

@dg.asset
def orders() -> pd.DataFrame:
    return pd.read_csv("orders.csv")

@dg.asset_check(asset=orders, description="订单金额不能为负")
def no_negative_amounts(orders: pd.DataFrame):
    negative_count = (orders["amount"] < 0).sum()
    return dg.AssetCheckResult(
        passed=negative_count == 0,
        metadata={"negative_count": int(negative_count)},
    )

@dg.asset_check(asset=orders, description="没有重复订单ID")
def unique_order_ids(orders: pd.DataFrame):
    duplicates = orders["order_id"].duplicated().sum()
    return dg.AssetCheckResult(
        passed=duplicates == 0,
        metadata={"duplicate_count": int(duplicates)},
    )

@dg.asset_check(asset=orders, description="订单数据时效性")
def data_freshness(orders: pd.DataFrame):
    latest = pd.to_datetime(orders["date"]).max()
    hours_old = (pd.Timestamp.now() - latest).total_seconds() / 3600
    return dg.AssetCheckResult(
        passed=hours_old < 24,
        metadata={"hours_since_latest": round(hours_old, 1)},
    )

场景六:dbt 集成

from dagster_dbt import DbtCliResource, dbt_assets, DbtProject

dbt_project = DbtProject(project_dir="dbt_project/")

@dbt_assets(manifest=dbt_project.manifest_path)
def my_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

defs = dg.Definitions(
    assets=[my_dbt_assets],
    resources={"dbt": DbtCliResource(project_dir=dbt_project)},
)

场景七:Resource 依赖注入

import dagster as dg
from dagster import ConfigurableResource

class DatabaseResource(ConfigurableResource):
    host: str
    port: int = 5432
    database: str
    user: str
    password: str

    def query(self, sql: str):
        import psycopg2
        conn = psycopg2.connect(
            host=self.host, port=self.port,
            database=self.database,
            user=self.user, password=self.password,
        )
        return pd.read_sql(sql, conn)

@dg.asset
def user_stats(db: DatabaseResource) -> pd.DataFrame:
    return db.query("SELECT * FROM user_stats")

# 生产配置
prod_defs = dg.Definitions(
    assets=[user_stats],
    resources={"db": DatabaseResource(
        host="prod-db.example.com",
        database="analytics",
        user=dg.EnvVar("DB_USER"),
        password=dg.EnvVar("DB_PASSWORD"),
    )},
)

常见问题与排错

问题一:Asset 依赖不正确

资产之间的依赖通过函数参数名推导。参数名必须与上游资产名匹配:

@dg.asset
def raw_data():    # 资产名 = "raw_data"
    return ...

@dg.asset
def processed(raw_data):  # 参数名 "raw_data" → 依赖上游 "raw_data"
    return ...

问题二:I/O Manager 与自定义存储

# 如果不想用 I/O Manager,可以返回 None 并自己处理存储
@dg.asset(io_manager_key="no_io")
def custom_storage_asset():
    df = process_data()
    df.to_parquet("s3://bucket/output.parquet")
    # 不返回值,让 Dagster 知道资产已物化

问题三:开发 vs 生产资源切换

# 使用不同的 Definitions 实例
dev_defs = dg.Definitions(
    assets=all_assets,
    resources={"db": DatabaseResource(host="localhost", ...)},
)

prod_defs = dg.Definitions(
    assets=all_assets,
    resources={"db": DatabaseResource(host="prod-server", ...)},
)

问题四:性能优化 - 大量小资产

# 使用 multi_asset 减少开销
@dg.multi_asset(
    outs={
        "users": dg.AssetOut(),
        "orders": dg.AssetOut(),
        "products": dg.AssetOut(),
    }
)
def extract_all():
    return pd.read_csv("users.csv"), pd.read_csv("orders.csv"), pd.read_csv("products.csv")

问题五:如何调试

# 本地开发(推荐)
dagster dev

# 物化单个资产
dagster asset materialize --select raw_orders

# 运行测试
pytest tests/
# 在测试中执行资产
def test_daily_stats():
    result = dg.materialize([raw_orders, daily_stats])
    assert result.success
    df = result.output_for_node("daily_stats")
    assert len(df) > 0

参考资源

  • 官方文档:https://docs.dagster.io/
  • GitHub:https://github.com/dagster-io/dagster
  • Dagster University:https://courses.dagster.io/
  • Dagster Blog:https://dagster.io/blog
  • Slack 社区:https://dagster.io/slack
  • Dagster Cloud:https://dagster.cloud/
  • Dagster + dbt 指南:https://docs.dagster.io/integrations/dbt
  • 从 Airflow 迁移:https://docs.dagster.io/integrations/airflow