跳转至

Prefect 3 工作流编排完全指南

为什么要学 Prefect

  1. Python 原生,最低侵入性:Prefect 用装饰器把普通 Python 函数变成可编排的工作流。不需要学 DSL、不需要写 YAML、不需要改变代码结构。你的业务逻辑就是普通 Python 函数,加个 @flow@task 装饰器就能获得重试、日志、监控、调度等能力。

  2. 动态工作流(DAG-Free):与 Airflow 的静态 DAG 不同,Prefect 的工作流是动态的——可以有条件分支、循环、动态并行。flow 就是函数,可以调用其他函数,用 if/for 等控制流。

  3. 本地开发 == 生产运行:本地 python my_flow.py 就能运行工作流,不需要启动调度器或 workers。部署到 Prefect Cloud 后行为完全相同,大幅简化了开发调试循环。

  4. Prefect Cloud 免运维:Prefect Cloud 提供托管的调度、监控、告警、RBAC。免费层就够小团队使用。不需要自己维护 Airflow 的 Web Server + Scheduler + Database + Worker。

  5. 现代化的可观测性:内置的 UI 提供 flow run 可视化、任务状态追踪、日志聚合、告警通知。每次运行都自动记录输入输出、耗时、重试次数等。


核心概念详解

Prefect 是什么(白话解释)

想象你在做菜:先洗菜、再切菜、然后炒菜、最后装盘。如果中间切菜的时候刀断了,你得重来。

Prefect 就是一个"智能厨房管理系统": - 记录每道工序的状态(完成了哪些、失败了哪些) - 失败的步骤自动重试 - 可以定时自动开始做菜(调度) - 出了问题发消息通知你(告警) - 有个监控屏幕看所有菜的进度(UI)

核心概念

概念说明代码
Flow工作流(主函数)@flow
Task工作单元(子函数)@task
Flow Run一次 flow 的执行自动创建
Task Run一次 task 的执行自动创建
Deploymentflow 的部署配置.deploy()
Work Pool执行环境配置Prefect UI/CLI
Worker执行 flow run 的进程prefect worker start
Schedule定时/Cron 调度Cron/Interval/RRule
Artifact运行产出物(表格、Markdown)create_markdown_artifact
Block可复用配置(凭证、连接等)Prefect UI/Code

Prefect vs Airflow vs Dagster 对比

特性Prefect 3Airflow 2Dagster
DAG 定义动态(Python函数)静态(DAG对象)静态(Assets/Ops)
核心抽象Flow + TaskDAG + OperatorAsset + Op
调度器Cloud 托管或自建自建(必需)Cloud 或自建
本地运行直接 python 执行需要完整环境直接 python 执行
部署复杂度
UICloud UI(免费层)自建 Web UICloud/OSS UI
重试装饰器参数Operator 参数Op 参数
参数传递Python 原生XCom(复杂)Python 原生
动态任务原生支持2.x TaskGroup原生支持
数据血缘有限有限核心特性(Assets)
学习曲线中高
社区活跃最大增长中
适合场景数据管道/ML/ETL大规模ETL数据平台/血缘

安装与配置

# 安装 Prefect
pip install prefect

# 验证
prefect version

# 启动本地 Prefect Server(可选)
prefect server start
# 打开 http://127.0.0.1:4200

# 或使用 Prefect Cloud(推荐)
prefect cloud login

Prefect Cloud 配置

# 注册 Prefect Cloud(https://app.prefect.cloud)
# 获取 API Key

# 登录
prefect cloud login -k "pnu_xxxxxxxxxxxx"

# 设置默认 workspace
prefect cloud workspace set --workspace "my-org/my-workspace"

项目结构

my-project/
├── flows/
│   ├── etl_flow.py
│   ├── ml_pipeline.py
│   └── report_flow.py
├── tasks/
│   ├── extract.py
│   ├── transform.py
│   └── load.py
├── prefect.yaml           # 部署配置
├── requirements.txt
└── .env

快速上手:5 分钟最小示例

# flow.py
from prefect import flow, task
from prefect.logging import get_run_logger
import httpx

@task(retries=3, retry_delay_seconds=10)
def fetch_data(url: str) -> dict:
    """从 API 获取数据"""
    logger = get_run_logger()
    logger.info(f"正在获取: {url}")
    response = httpx.get(url)
    response.raise_for_status()
    return response.json()

@task
def transform_data(raw: dict) -> list:
    """转换数据"""
    return [
        {"name": item["name"], "stars": item["stargazers_count"]}
        for item in raw
    ]

@task
def save_data(data: list) -> str:
    """保存数据"""
    import json
    with open("output.json", "w") as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
    return f"保存了 {len(data)} 条记录"

@flow(name="GitHub Trending", log_prints=True)
def github_flow(org: str = "prefecthq"):
    """获取 GitHub 组织的仓库信息"""
    print(f"开始处理组织: {org}")

    raw = fetch_data(f"https://api.github.com/orgs/{org}/repos?sort=stars&per_page=10")
    transformed = transform_data(raw)
    result = save_data(transformed)

    print(f"完成: {result}")
    return result

# 本地运行
if __name__ == "__main__":
    github_flow(org="prefecthq")

运行:

python flow.py
# 在 Prefect UI 中可以看到运行记录


进阶用法

场景一:ETL 管道 + 调度

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def extract(source: str) -> pd.DataFrame:
    """提取数据(带缓存)"""
    if source == "database":
        return pd.read_sql("SELECT * FROM orders", conn)
    elif source == "csv":
        return pd.read_csv("data/orders.csv")
    elif source == "api":
        response = httpx.get("https://api.example.com/orders")
        return pd.DataFrame(response.json())

@task
def transform(df: pd.DataFrame) -> pd.DataFrame:
    """数据转换"""
    df["order_date"] = pd.to_datetime(df["order_date"])
    df["revenue"] = df["quantity"] * df["unit_price"]
    daily = df.groupby(df["order_date"].dt.date).agg(
        total_revenue=("revenue", "sum"),
        order_count=("order_id", "count"),
        avg_order_value=("revenue", "mean"),
    ).reset_index()
    return daily

@task
def load(df: pd.DataFrame, destination: str):
    """加载到目标"""
    if destination == "warehouse":
        df.to_sql("daily_revenue", engine, if_exists="replace")
    elif destination == "csv":
        df.to_csv("output/daily_revenue.csv", index=False)

@flow(name="Daily ETL")
def etl_pipeline(source: str = "database", destination: str = "warehouse"):
    raw = extract(source)
    transformed = transform(raw)
    load(transformed, destination)

# 部署并添加调度
if __name__ == "__main__":
    etl_pipeline.serve(
        name="daily-etl-deployment",
        cron="0 2 * * *",  # 每天凌晨2点
        tags=["etl", "production"],
        parameters={"source": "database", "destination": "warehouse"},
    )

场景二:动态并行任务

from prefect import flow, task

@task
def process_file(file_path: str) -> dict:
    """处理单个文件"""
    import pandas as pd
    df = pd.read_csv(file_path)
    return {
        "file": file_path,
        "rows": len(df),
        "columns": len(df.columns),
    }

@flow
def batch_processing(directory: str = "data/"):
    """并行处理目录中的所有文件"""
    import os

    files = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith(".csv")]

    # 方式一:自动并行(submit)
    futures = [process_file.submit(f) for f in files]
    results = [f.result() for f in futures]

    # 方式二:使用 map
    results = process_file.map(files)

    print(f"处理了 {len(results)} 个文件")
    return results

场景三:错误处理与通知

from prefect import flow, task
from prefect.blocks.notifications import SlackWebhook

@task(retries=3, retry_delay_seconds=[10, 30, 60])  # 递增重试间隔
def flaky_task():
    import random
    if random.random() < 0.5:
        raise Exception("随机失败!")
    return "成功"

@flow(
    name="Resilient Flow",
    on_failure=[notify_slack],   # 失败时通知
    on_completion=[log_success], # 完成时记录
)
def resilient_flow():
    try:
        result = flaky_task()
        return result
    except Exception as e:
        # 可以优雅地处理错误
        print(f"任务失败: {e}")
        return "降级结果"

async def notify_slack(flow, flow_run, state):
    slack = await SlackWebhook.load("my-slack-webhook")
    await slack.notify(f"Flow {flow.name} 失败了!状态: {state}")

def log_success(flow, flow_run, state):
    print(f"Flow {flow.name} 完成,状态: {state}")

场景四:子 Flow(嵌套工作流)

from prefect import flow, task

@task
def extract_users():
    return [{"id": 1, "name": "张三"}, {"id": 2, "name": "李四"}]

@task
def extract_orders():
    return [{"id": 1, "user_id": 1, "amount": 100}]

@flow(name="Extract")
def extract_flow():
    users = extract_users()
    orders = extract_orders()
    return {"users": users, "orders": orders}

@flow(name="Transform")
def transform_flow(data: dict):
    # 转换逻辑...
    return data

@flow(name="Load")
def load_flow(data: dict):
    # 加载逻辑...
    pass

@flow(name="Main ETL Pipeline")
def main_pipeline():
    """主管道调用子 flow"""
    data = extract_flow()        # 子 flow 1
    transformed = transform_flow(data)  # 子 flow 2
    load_flow(transformed)       # 子 flow 3

场景五:Artifact(产出物)

from prefect import flow, task
from prefect.artifacts import create_markdown_artifact, create_table_artifact

@task
def generate_report(data):
    # 创建 Markdown 报告
    create_markdown_artifact(
        key="daily-report",
        markdown=f"""
# 日报 - {data['date']}

## 关键指标
- 总订单: **{data['orders']}**
- 总收入: **¥{data['revenue']:,.0f}**
- 平均订单额: **¥{data['avg_order']:.0f}**

## 趋势
收入较昨日{'增长' if data['growth'] > 0 else '下降'} {abs(data['growth']):.1f}%
        """,
        description="每日业务报告",
    )

    # 创建表格
    create_table_artifact(
        key="top-products",
        table=[
            {"产品": "A", "销量": 100, "收入": 5000},
            {"产品": "B", "销量": 80, "收入": 4000},
        ],
        description="热销产品 Top 10",
    )

场景六:部署配置

# 方式一:.serve()(最简单,进程内运行)
if __name__ == "__main__":
    etl_pipeline.serve(
        name="my-etl",
        cron="0 * * * *",  # 每小时
    )

# 方式二:.deploy()(推荐生产环境)
if __name__ == "__main__":
    etl_pipeline.deploy(
        name="production-etl",
        work_pool_name="my-docker-pool",
        cron="0 2 * * *",
        tags=["production", "etl"],
        parameters={"source": "database"},
        image="my-registry/etl:latest",
    )
# prefect.yaml(声明式部署配置)
deployments:
  - name: daily-etl
    entrypoint: flows/etl.py:etl_pipeline
    work_pool:
      name: my-pool
    schedule:
      cron: "0 2 * * *"
    parameters:
      source: database
    tags:
      - production
      - etl
# 部署
prefect deploy --all

# 或特定部署
prefect deploy -n daily-etl

常见问题与排错

问题一:Task 结果太大导致序列化慢

# 使用 persist_result=False 避免序列化大结果
@task(persist_result=False)
def process_big_data():
    return huge_dataframe  # 不会序列化存储

# 或使用文件路径传递而不是传数据本身
@task
def save_intermediate(df):
    path = "/tmp/intermediate.parquet"
    df.to_parquet(path)
    return path  # 返回路径而不是数据

@task
def load_intermediate(path: str):
    return pd.read_parquet(path)

问题二:Flow 运行卡住

# 检查 worker 状态
prefect worker ls

# 检查 work pool
prefect work-pool ls

# 查看 flow run 状态
prefect flow-run ls --state RUNNING

问题三:重试策略

@task(
    retries=5,
    retry_delay_seconds=[10, 30, 60, 120, 300],  # 指数退避
    retry_condition_fn=lambda task, task_run, state: "timeout" in str(state.result()),
)
def selective_retry_task():
    ...

问题四:环境变量和密钥管理

from prefect.blocks.system import Secret

# 在 UI 或代码中创建 Secret Block
secret = Secret(value="my-api-key")
secret.save("my-api-key")

# 使用
@task
async def use_secret():
    secret = await Secret.load("my-api-key")
    api_key = secret.get()

问题五:Flow 本地 vs 部署行为不一致

确保本地环境和部署环境的依赖版本一致。使用 Docker 部署可以保证环境一致:

etl_pipeline.deploy(
    name="docker-etl",
    work_pool_name="docker-pool",
    image="python:3.11-slim",
    push=False,
)

问题六:并发限制

from prefect import flow, task
from prefect.concurrency.sync import concurrency

@task
def rate_limited_api_call(item):
    with concurrency("api-calls", occupy=1):  # 最多同时 N 个
        return httpx.get(f"https://api.example.com/{item}")

# 在 CLI 中设置并发限制
# prefect concurrency-limit create api-calls --limit 5

参考资源

  • 官方文档:https://docs.prefect.io/
  • GitHub:https://github.com/PrefectHQ/prefect
  • Prefect Cloud:https://app.prefect.cloud
  • 教程:https://docs.prefect.io/tutorials/
  • Recipes(实用示例):https://docs.prefect.io/recipes/
  • Prefect Blog:https://www.prefect.io/blog
  • Slack 社区:https://www.prefect.io/slack
  • Prefect Integrations:https://docs.prefect.io/integrations/