跳转至

Dagster 数据管道

一句话概述:Dagster 是以"数据资产"为中心的编排平台,不是告诉系统"做什么步骤",而是声明"我想要什么数据",让系统自动推断执行逻辑。

核心知识点表

概念白话解释
Asset数据资产,就是你想要的最终产物(一张表、一个模型、一个报告)
Op操作,一个具体的计算步骤(类似Airflow的Task)
Job把多个Op或Asset组合成一个可执行的"任务包"
Resource外部资源的连接(数据库、API等),类似Airflow的Connection
IO Manager管理数据怎么存、怎么读的组件(存到文件?数据库?)
Sensor传感器,监听外部事件并触发Job
Schedule定时调度,按Cron表达式触发
Partition分区,把大任务按时间或其他维度拆分(比如按天处理)
Dagster+Dagster的云服务(商业版)

版本信息(2026年5月)

  • 最新版本:Dagster 1.13.4
  • Python支持:3.9 - 3.14
  • 亮点:Dagster Skills、分区资产检查、虚拟资产

安装配置

# 创建虚拟环境
conda create -n dagster python=3.12 -y  # 创建Python 3.12环境
conda activate dagster  # 激活

# 安装Dagster(核心 + Web UI + 常用集成)
pip install dagster dagster-webserver  # 核心包 + Web界面

# 创建新项目(脚手架)
dagster project scaffold --name my_project  # 自动生成项目结构
cd my_project  # 进入项目

# 安装项目依赖
pip install -e ".[dev]"  # 以开发模式安装

# 启动开发服务器
dagster dev  # 启动后访问 http://localhost:3000

项目结构

my_project/
├── my_project/
│   ├── __init__.py        # 项目入口,注册资产和资源
│   ├── assets/            # 数据资产定义
│   │   └── __init__.py
│   ├── resources/         # 资源配置(数据库连接等)
│   └── jobs/              # Job定义
├── tests/                 # 测试
├── setup.py               # 包配置
└── pyproject.toml         # 项目配置

基本使用

定义数据资产

# my_project/assets/__init__.py
from dagster import asset, AssetExecutionContext  # 导入资产装饰器
import pandas as pd  # 导入pandas

@asset(
    group_name="raw_data",  # 资产分组
    description="从CSV读取原始用户数据",  # 描述(会显示在UI上)
)
def raw_users(context: AssetExecutionContext) -> pd.DataFrame:
    """读取原始用户数据"""
    df = pd.read_csv("data/users.csv")  # 读取CSV文件
    context.log.info(f"读取了 {len(df)} 条用户记录")  # 记录日志
    return df  # 返回DataFrame,Dagster自动存储

@asset(
    group_name="transformed",
    description="清洗后的用户数据",
)
def clean_users(raw_users: pd.DataFrame) -> pd.DataFrame:
    """清洗用户数据(自动依赖raw_users)"""
    # 参数名 raw_users 匹配上面的资产名,Dagster自动建立依赖
    df = raw_users.dropna()  # 删除空值
    df = df.drop_duplicates(subset=["email"])  # 按邮箱去重
    return df

@asset(
    group_name="analytics",
    description="用户统计报表",
)
def user_stats(clean_users: pd.DataFrame) -> dict:
    """生成用户统计"""
    stats = {
        "total_users": len(clean_users),  # 总用户数
        "avg_age": float(clean_users["age"].mean()),  # 平均年龄
    }
    return stats

注册资产到项目

# my_project/__init__.py
from dagster import Definitions, load_assets_from_modules  # 导入工具
from my_project import assets  # 导入资产模块

# 加载所有资产并注册
defs = Definitions(
    assets=load_assets_from_modules([assets]),  # 自动扫描assets模块
)

定义Job和Schedule

from dagster import (
    define_asset_job,  # 基于资产定义Job
    ScheduleDefinition,  # 定义调度
    AssetSelection,  # 选择哪些资产
)

# 定义一个Job:物化(生成)所有analytics组的资产
analytics_job = define_asset_job(
    name="refresh_analytics",  # Job名称
    selection=AssetSelection.groups("analytics"),  # 选择analytics组
)

# 定义调度:每天凌晨2点刷新
daily_schedule = ScheduleDefinition(
    job=analytics_job,  # 关联的Job
    cron_schedule="0 2 * * *",  # Cron表达式
)

高级用法

分区资产

from dagster import asset, DailyPartitionsDefinition  # 按天分区

# 定义按天分区
daily_partitions = DailyPartitionsDefinition(
    start_date="2026-01-01",  # 分区开始日期
)

@asset(
    partitions_def=daily_partitions,  # 应用分区定义
    description="每日销售数据",
)
def daily_sales(context: AssetExecutionContext) -> pd.DataFrame:
    """按天处理销售数据"""
    partition_date = context.partition_key  # 获取当前分区日期,如"2026-05-13"
    context.log.info(f"处理 {partition_date} 的数据")

    # 只查询当天的数据
    df = pd.read_sql(
        f"SELECT * FROM sales WHERE date = '{partition_date}'",
        con=db_connection,
    )
    return df

Resource资源管理

from dagster import resource, asset, Definitions, ConfigurableResource

# 定义可配置的资源
class DatabaseResource(ConfigurableResource):
    """数据库资源"""
    host: str  # 数据库地址
    port: int = 5432  # 端口,默认5432
    database: str  # 数据库名

    def query(self, sql: str):
        """执行SQL查询"""
        import psycopg2
        conn = psycopg2.connect(
            host=self.host,
            port=self.port,
            database=self.database,
        )
        return pd.read_sql(sql, conn)

@asset
def sales_data(db: DatabaseResource) -> pd.DataFrame:
    """用数据库资源查询销售数据"""
    return db.query("SELECT * FROM sales LIMIT 1000")

# 注册资源
defs = Definitions(
    assets=[sales_data],
    resources={
        "db": DatabaseResource(  # 注入数据库资源
            host="localhost",
            database="mydb",
        ),
    },
)

Sensor事件驱动

from dagster import sensor, RunRequest, SensorEvaluationContext
import os

@sensor(
    job=analytics_job,  # 触发哪个Job
    minimum_interval_seconds=30,  # 最小检查间隔30秒
)
def new_file_sensor(context: SensorEvaluationContext):
    """监听新文件到达"""
    data_dir = "/data/incoming"
    for filename in os.listdir(data_dir):
        filepath = os.path.join(data_dir, filename)
        if filename.endswith(".csv"):
            yield RunRequest(  # 发现新文件就触发一次Job
                run_key=filename,  # 用文件名做去重键,同一文件不会触发两次
                run_config={"filename": filepath},
            )

测试资产

# tests/test_assets.py
import pandas as pd
from my_project.assets import raw_users, clean_users, user_stats

def test_clean_users():
    """测试数据清洗逻辑"""
    # 构造测试数据(有空值和重复)
    test_data = pd.DataFrame({
        "name": ["Alice", "Bob", None, "Alice"],
        "email": ["a@test.com", "b@test.com", "c@test.com", "a@test.com"],
        "age": [25, 30, None, 25],
    })

    result = clean_users(test_data)  # 直接调用函数测试

    assert len(result) <= len(test_data)  # 清洗后不应更多
    assert result["email"].is_unique  # 邮箱应该唯一
    assert result.isna().sum().sum() == 0  # 没有空值

常见报错与解决

报错信息原因解决方案
DagsterInvalidDefinitionError资产定义有误(参数名和资产名不匹配)检查函数参数名是否和依赖的资产名一致
No module named dagster没安装或环境没激活conda activate dagster && pip install dagster
Asset key not found引用了不存在的资产检查资产是否在Definitions中注册
dagster dev 启动失败端口被占用dagster dev -p 3001 换端口
IOManager error数据存储配置有问题检查IO Manager配置,默认用文件系统
Partition key invalid分区键格式不对确保日期格式匹配分区定义

速查表

# ===== CLI命令 =====
dagster dev                      # 启动开发服务器
dagster dev -p 3001              # 指定端口
dagster project scaffold --name X  # 创建新项目
dagster asset materialize -m X   # 物化资产
dagster job execute -m X -j Y    # 执行Job
dagster instance info            # 显示实例信息

# ===== 常用装饰器 =====
# @asset                          基本资产
# @asset(group_name="X")          资产分组
# @asset(partitions_def=X)        分区资产
# @asset(deps=["other_asset"])    显式依赖
# @asset(key_prefix="schema")     资产键前缀
# @multi_asset                    一个函数产出多个资产
# @op                             基本操作
# @job                            定义Job
# @sensor                         事件传感器
# @schedule                       定时调度

同类工具对比

特性DagsterAirflowPrefect
核心理念数据资产为中心任务编排为中心Python函数为中心
数据血缘原生自动生成需额外配置有限支持
测试友好度★★★★★(最好)★★★★★★★
数据质量检查内置Asset Check需插件需手写
适合场景数据质量优先的团队大型企业级流程快速上手的小团队

面试建议:Dagster最大的差异化卖点是"Asset-centric"(资产为中心),而不是"Task-centric"(任务为中心)。面试时可以说:"Dagster关注的是数据产物本身,而不是产生数据的步骤,这让数据血缘和质量检查变得很自然。"