Prefect 3 工作流编排完全指南¶
为什么要学 Prefect¶
Python 原生,最低侵入性:Prefect 用装饰器把普通 Python 函数变成可编排的工作流。不需要学 DSL、不需要写 YAML、不需要改变代码结构。你的业务逻辑就是普通 Python 函数,加个
@flow或@task装饰器就能获得重试、日志、监控、调度等能力。动态工作流(DAG-Free):与 Airflow 的静态 DAG 不同,Prefect 的工作流是动态的——可以有条件分支、循环、动态并行。flow 就是函数,可以调用其他函数,用 if/for 等控制流。
本地开发 == 生产运行:本地
python my_flow.py就能运行工作流,不需要启动调度器或 workers。部署到 Prefect Cloud 后行为完全相同,大幅简化了开发调试循环。Prefect Cloud 免运维:Prefect Cloud 提供托管的调度、监控、告警、RBAC。免费层就够小团队使用。不需要自己维护 Airflow 的 Web Server + Scheduler + Database + Worker。
现代化的可观测性:内置的 UI 提供 flow run 可视化、任务状态追踪、日志聚合、告警通知。每次运行都自动记录输入输出、耗时、重试次数等。
核心概念详解¶
Prefect 是什么(白话解释)¶
想象你在做菜:先洗菜、再切菜、然后炒菜、最后装盘。如果中间切菜的时候刀断了,你得重来。
Prefect 就是一个"智能厨房管理系统": - 记录每道工序的状态(完成了哪些、失败了哪些) - 失败的步骤自动重试 - 可以定时自动开始做菜(调度) - 出了问题发消息通知你(告警) - 有个监控屏幕看所有菜的进度(UI)
核心概念¶
| 概念 | 说明 | 代码 |
|---|---|---|
| Flow | 工作流(主函数) | @flow |
| Task | 工作单元(子函数) | @task |
| Flow Run | 一次 flow 的执行 | 自动创建 |
| Task Run | 一次 task 的执行 | 自动创建 |
| Deployment | flow 的部署配置 | .deploy() |
| Work Pool | 执行环境配置 | Prefect UI/CLI |
| Worker | 执行 flow run 的进程 | prefect worker start |
| Schedule | 定时/Cron 调度 | Cron/Interval/RRule |
| Artifact | 运行产出物(表格、Markdown) | create_markdown_artifact |
| Block | 可复用配置(凭证、连接等) | Prefect UI/Code |
Prefect vs Airflow vs Dagster 对比¶
| 特性 | Prefect 3 | Airflow 2 | Dagster |
|---|---|---|---|
| DAG 定义 | 动态(Python函数) | 静态(DAG对象) | 静态(Assets/Ops) |
| 核心抽象 | Flow + Task | DAG + Operator | Asset + Op |
| 调度器 | Cloud 托管或自建 | 自建(必需) | Cloud 或自建 |
| 本地运行 | 直接 python 执行 | 需要完整环境 | 直接 python 执行 |
| 部署复杂度 | 低 | 高 | 中 |
| UI | Cloud UI(免费层) | 自建 Web UI | Cloud/OSS UI |
| 重试 | 装饰器参数 | Operator 参数 | Op 参数 |
| 参数传递 | Python 原生 | XCom(复杂) | Python 原生 |
| 动态任务 | 原生支持 | 2.x TaskGroup | 原生支持 |
| 数据血缘 | 有限 | 有限 | 核心特性(Assets) |
| 学习曲线 | 低 | 中高 | 中 |
| 社区 | 活跃 | 最大 | 增长中 |
| 适合场景 | 数据管道/ML/ETL | 大规模ETL | 数据平台/血缘 |
安装与配置¶
# 安装 Prefect
pip install prefect
# 验证
prefect version
# 启动本地 Prefect Server(可选)
prefect server start
# 打开 http://127.0.0.1:4200
# 或使用 Prefect Cloud(推荐)
prefect cloud login
Prefect Cloud 配置¶
# 注册 Prefect Cloud(https://app.prefect.cloud)
# 获取 API Key
# 登录
prefect cloud login -k "pnu_xxxxxxxxxxxx"
# 设置默认 workspace
prefect cloud workspace set --workspace "my-org/my-workspace"
项目结构¶
my-project/
├── flows/
│ ├── etl_flow.py
│ ├── ml_pipeline.py
│ └── report_flow.py
├── tasks/
│ ├── extract.py
│ ├── transform.py
│ └── load.py
├── prefect.yaml # 部署配置
├── requirements.txt
└── .env
快速上手:5 分钟最小示例¶
# flow.py
from prefect import flow, task
from prefect.logging import get_run_logger
import httpx
@task(retries=3, retry_delay_seconds=10)
def fetch_data(url: str) -> dict:
"""从 API 获取数据"""
logger = get_run_logger()
logger.info(f"正在获取: {url}")
response = httpx.get(url)
response.raise_for_status()
return response.json()
@task
def transform_data(raw: dict) -> list:
"""转换数据"""
return [
{"name": item["name"], "stars": item["stargazers_count"]}
for item in raw
]
@task
def save_data(data: list) -> str:
"""保存数据"""
import json
with open("output.json", "w") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return f"保存了 {len(data)} 条记录"
@flow(name="GitHub Trending", log_prints=True)
def github_flow(org: str = "prefecthq"):
"""获取 GitHub 组织的仓库信息"""
print(f"开始处理组织: {org}")
raw = fetch_data(f"https://api.github.com/orgs/{org}/repos?sort=stars&per_page=10")
transformed = transform_data(raw)
result = save_data(transformed)
print(f"完成: {result}")
return result
# 本地运行
if __name__ == "__main__":
github_flow(org="prefecthq")
运行:
进阶用法¶
场景一:ETL 管道 + 调度¶
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def extract(source: str) -> pd.DataFrame:
"""提取数据(带缓存)"""
if source == "database":
return pd.read_sql("SELECT * FROM orders", conn)
elif source == "csv":
return pd.read_csv("data/orders.csv")
elif source == "api":
response = httpx.get("https://api.example.com/orders")
return pd.DataFrame(response.json())
@task
def transform(df: pd.DataFrame) -> pd.DataFrame:
"""数据转换"""
df["order_date"] = pd.to_datetime(df["order_date"])
df["revenue"] = df["quantity"] * df["unit_price"]
daily = df.groupby(df["order_date"].dt.date).agg(
total_revenue=("revenue", "sum"),
order_count=("order_id", "count"),
avg_order_value=("revenue", "mean"),
).reset_index()
return daily
@task
def load(df: pd.DataFrame, destination: str):
"""加载到目标"""
if destination == "warehouse":
df.to_sql("daily_revenue", engine, if_exists="replace")
elif destination == "csv":
df.to_csv("output/daily_revenue.csv", index=False)
@flow(name="Daily ETL")
def etl_pipeline(source: str = "database", destination: str = "warehouse"):
raw = extract(source)
transformed = transform(raw)
load(transformed, destination)
# 部署并添加调度
if __name__ == "__main__":
etl_pipeline.serve(
name="daily-etl-deployment",
cron="0 2 * * *", # 每天凌晨2点
tags=["etl", "production"],
parameters={"source": "database", "destination": "warehouse"},
)
场景二:动态并行任务¶
from prefect import flow, task
@task
def process_file(file_path: str) -> dict:
"""处理单个文件"""
import pandas as pd
df = pd.read_csv(file_path)
return {
"file": file_path,
"rows": len(df),
"columns": len(df.columns),
}
@flow
def batch_processing(directory: str = "data/"):
"""并行处理目录中的所有文件"""
import os
files = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith(".csv")]
# 方式一:自动并行(submit)
futures = [process_file.submit(f) for f in files]
results = [f.result() for f in futures]
# 方式二:使用 map
results = process_file.map(files)
print(f"处理了 {len(results)} 个文件")
return results
场景三:错误处理与通知¶
from prefect import flow, task
from prefect.blocks.notifications import SlackWebhook
@task(retries=3, retry_delay_seconds=[10, 30, 60]) # 递增重试间隔
def flaky_task():
import random
if random.random() < 0.5:
raise Exception("随机失败!")
return "成功"
@flow(
name="Resilient Flow",
on_failure=[notify_slack], # 失败时通知
on_completion=[log_success], # 完成时记录
)
def resilient_flow():
try:
result = flaky_task()
return result
except Exception as e:
# 可以优雅地处理错误
print(f"任务失败: {e}")
return "降级结果"
async def notify_slack(flow, flow_run, state):
slack = await SlackWebhook.load("my-slack-webhook")
await slack.notify(f"Flow {flow.name} 失败了!状态: {state}")
def log_success(flow, flow_run, state):
print(f"Flow {flow.name} 完成,状态: {state}")
场景四:子 Flow(嵌套工作流)¶
from prefect import flow, task
@task
def extract_users():
return [{"id": 1, "name": "张三"}, {"id": 2, "name": "李四"}]
@task
def extract_orders():
return [{"id": 1, "user_id": 1, "amount": 100}]
@flow(name="Extract")
def extract_flow():
users = extract_users()
orders = extract_orders()
return {"users": users, "orders": orders}
@flow(name="Transform")
def transform_flow(data: dict):
# 转换逻辑...
return data
@flow(name="Load")
def load_flow(data: dict):
# 加载逻辑...
pass
@flow(name="Main ETL Pipeline")
def main_pipeline():
"""主管道调用子 flow"""
data = extract_flow() # 子 flow 1
transformed = transform_flow(data) # 子 flow 2
load_flow(transformed) # 子 flow 3
场景五:Artifact(产出物)¶
from prefect import flow, task
from prefect.artifacts import create_markdown_artifact, create_table_artifact
@task
def generate_report(data):
# 创建 Markdown 报告
create_markdown_artifact(
key="daily-report",
markdown=f"""
# 日报 - {data['date']}
## 关键指标
- 总订单: **{data['orders']}**
- 总收入: **¥{data['revenue']:,.0f}**
- 平均订单额: **¥{data['avg_order']:.0f}**
## 趋势
收入较昨日{'增长' if data['growth'] > 0 else '下降'} {abs(data['growth']):.1f}%
""",
description="每日业务报告",
)
# 创建表格
create_table_artifact(
key="top-products",
table=[
{"产品": "A", "销量": 100, "收入": 5000},
{"产品": "B", "销量": 80, "收入": 4000},
],
description="热销产品 Top 10",
)
场景六:部署配置¶
# 方式一:.serve()(最简单,进程内运行)
if __name__ == "__main__":
etl_pipeline.serve(
name="my-etl",
cron="0 * * * *", # 每小时
)
# 方式二:.deploy()(推荐生产环境)
if __name__ == "__main__":
etl_pipeline.deploy(
name="production-etl",
work_pool_name="my-docker-pool",
cron="0 2 * * *",
tags=["production", "etl"],
parameters={"source": "database"},
image="my-registry/etl:latest",
)
# prefect.yaml(声明式部署配置)
deployments:
- name: daily-etl
entrypoint: flows/etl.py:etl_pipeline
work_pool:
name: my-pool
schedule:
cron: "0 2 * * *"
parameters:
source: database
tags:
- production
- etl
常见问题与排错¶
问题一:Task 结果太大导致序列化慢¶
# 使用 persist_result=False 避免序列化大结果
@task(persist_result=False)
def process_big_data():
return huge_dataframe # 不会序列化存储
# 或使用文件路径传递而不是传数据本身
@task
def save_intermediate(df):
path = "/tmp/intermediate.parquet"
df.to_parquet(path)
return path # 返回路径而不是数据
@task
def load_intermediate(path: str):
return pd.read_parquet(path)
问题二:Flow 运行卡住¶
# 检查 worker 状态
prefect worker ls
# 检查 work pool
prefect work-pool ls
# 查看 flow run 状态
prefect flow-run ls --state RUNNING
问题三:重试策略¶
@task(
retries=5,
retry_delay_seconds=[10, 30, 60, 120, 300], # 指数退避
retry_condition_fn=lambda task, task_run, state: "timeout" in str(state.result()),
)
def selective_retry_task():
...
问题四:环境变量和密钥管理¶
from prefect.blocks.system import Secret
# 在 UI 或代码中创建 Secret Block
secret = Secret(value="my-api-key")
secret.save("my-api-key")
# 使用
@task
async def use_secret():
secret = await Secret.load("my-api-key")
api_key = secret.get()
问题五:Flow 本地 vs 部署行为不一致¶
确保本地环境和部署环境的依赖版本一致。使用 Docker 部署可以保证环境一致:
etl_pipeline.deploy(
name="docker-etl",
work_pool_name="docker-pool",
image="python:3.11-slim",
push=False,
)
问题六:并发限制¶
from prefect import flow, task
from prefect.concurrency.sync import concurrency
@task
def rate_limited_api_call(item):
with concurrency("api-calls", occupy=1): # 最多同时 N 个
return httpx.get(f"https://api.example.com/{item}")
# 在 CLI 中设置并发限制
# prefect concurrency-limit create api-calls --limit 5
参考资源¶
- 官方文档:https://docs.prefect.io/
- GitHub:https://github.com/PrefectHQ/prefect
- Prefect Cloud:https://app.prefect.cloud
- 教程:https://docs.prefect.io/tutorials/
- Recipes(实用示例):https://docs.prefect.io/recipes/
- Prefect Blog:https://www.prefect.io/blog
- Slack 社区:https://www.prefect.io/slack
- Prefect Integrations:https://docs.prefect.io/integrations/