Prefect 数据编排
一句话概述:Prefect 是现代Python工作流编排框架,用装饰器就能把普通Python函数变成可观测、可重试、可调度的数据管道。
核心知识点表
| 概念 | 白话解释 |
|---|
| Flow | 工作流的容器,相当于Airflow的DAG,但写法更简单 |
| Task | Flow里的一个步骤,加个@task装饰器就行 |
| Deployment | 把Flow"部署"到服务器上,让它能被调度或远程触发 |
| Work Pool | 工作池,决定任务在哪里运行(本地、Docker、K8s等) |
| Automation | 自动化规则,"当X发生时,做Y"(比如任务失败时发通知) |
| Artifact | 任务产生的"制品"(表格、图表等),可以在UI上查看 |
| Transaction | 3.0新增的事务机制,一组任务要么全成功,要么全回滚 |
版本信息(2026年5月)
- 最新版本:Prefect 3.7.0
- Python支持:3.9 - 3.13
- Prefect 3.0 相比2.0是重大升级,事件系统开源、性能提升98%
安装配置
安装Prefect
# 创建虚拟环境
conda create -n prefect python=3.12 -y # 创建环境
conda activate prefect # 激活
# 安装Prefect(只需一行)
pip install prefect # 安装最新版
# 验证安装
prefect version # 显示版本号
# 启动本地Prefect Server(自带UI)
prefect server start # 启动后访问 http://127.0.0.1:4200
配置Prefect Cloud(可选)
# 如果用Prefect Cloud(托管版,免费额度够学习用)
prefect cloud login # 登录Cloud
# 或者用API Key
prefect cloud login --key YOUR_API_KEY # 用API密钥登录
基本使用
第一个Flow
# my_flow.py
from prefect import flow, task # 导入核心装饰器
@task(retries=3, retry_delay_seconds=60) # 失败自动重试3次,间隔60秒
def extract_data():
"""提取数据"""
print("从数据库提取数据...")
return {"users": 100, "orders": 500} # 返回提取的数据
@task(log_prints=True) # 自动把print输出记录到日志
def transform_data(raw_data: dict):
"""转换数据"""
total = raw_data["users"] + raw_data["orders"]
print(f"共 {total} 条记录") # 这行会出现在Prefect日志中
return {"total": total}
@task
def load_data(data: dict):
"""加载数据"""
print(f"写入 {data['total']} 条记录到数据仓库")
@flow(name="ETL Pipeline", log_prints=True) # 定义工作流
def etl_pipeline():
"""完整的ETL流程"""
raw = extract_data() # 步骤1:提取
transformed = transform_data(raw) # 步骤2:转换
load_data(transformed) # 步骤3:加载
# 直接运行就行,不需要额外配置
if __name__ == "__main__":
etl_pipeline() # 运行Flow
# 运行
python my_flow.py # 像普通Python脚本一样运行
参数化Flow
@flow
def process_data(
date: str, # 处理哪天的数据
batch_size: int = 1000, # 每批处理多少条,默认1000
):
"""带参数的工作流"""
print(f"处理 {date} 的数据,批次大小 {batch_size}")
# 调用时传参
process_data(date="2026-05-13", batch_size=500)
子Flow
@flow # 子Flow也是用@flow装饰
def data_quality_check(data: dict):
"""数据质量检查(子Flow)"""
assert data["total"] > 0, "数据不能为空"
print("数据质量检查通过")
return True
@flow
def main_pipeline():
"""主流程"""
raw = extract_data()
transformed = transform_data(raw)
data_quality_check(transformed) # 调用子Flow
load_data(transformed)
高级用法
部署与调度
# deploy_flow.py
from my_flow import etl_pipeline # 导入你的Flow
if __name__ == "__main__":
# 方式1:serve模式(最简单,适合开发)
etl_pipeline.serve(
name="etl-daily", # 部署名称
cron="0 6 * * *", # 每天早上6点运行
tags=["production", "etl"], # 标签
)
# 方式2:CLI部署
prefect deploy my_flow.py:etl_pipeline \
--name "etl-daily" \
--cron "0 6 * * *" \
--pool default-agent-pool
事务(3.0新特性)
from prefect import task, flow
from prefect.transactions import transaction # 导入事务
@task
def create_order(item: str):
"""创建订单"""
print(f"创建订单: {item}")
return {"order_id": 123, "item": item}
@task
def charge_payment(order: dict):
"""扣款"""
print(f"订单 {order['order_id']} 扣款成功")
@task
def rollback_order(order: dict):
"""回滚订单(扣款失败时调用)"""
print(f"回滚订单 {order['order_id']}")
@flow
def purchase_flow():
"""购买流程:创建订单 → 扣款,失败则回滚"""
with transaction(): # 事务块:里面的任务要么全成功,要么全回滚
order = create_order("Python书")
charge_payment(order)
并发执行
from prefect import flow, task
from prefect.futures import wait # 导入等待工具
@task
def process_file(filename: str):
"""处理单个文件"""
print(f"处理 {filename}")
return f"{filename} 处理完成"
@flow
def parallel_processing():
"""并行处理多个文件"""
files = ["a.csv", "b.csv", "c.csv", "d.csv"]
# submit() 提交任务到线程池,并行执行
futures = [process_file.submit(f) for f in files] # 4个任务同时跑
# 等待所有任务完成
results = [f.result() for f in futures] # 收集结果
print(results)
缓存结果
from prefect import task
from prefect.cache_policies import INPUTS # 基于输入的缓存策略
from datetime import timedelta
@task(
cache_policy=INPUTS, # 相同输入 → 直接返回缓存结果
cache_expiration=timedelta(hours=1), # 缓存有效期1小时
)
def expensive_computation(x: int):
"""耗时计算,结果会被缓存"""
import time
time.sleep(10) # 模拟耗时操作
return x * 2
# 第一次调用:等10秒
# 第二次相同参数调用:直接返回,不等待
常见报错与解决
| 报错信息 | 原因 | 解决方案 |
|---|
Connection refused | Prefect Server没启动 | 运行 prefect server start |
Flow run failed: timeout | 任务超时 | 给task加 timeout_seconds 参数 |
ImportError: no module | 依赖没装 | 确保在Prefect运行的环境中 pip install |
API key is invalid | Cloud API密钥过期 | prefect cloud login 重新登录 |
Work pool not found | 工作池不存在 | prefect work-pool create my-pool |
prefect.exceptions.MissingResult | 任务没返回结果 | 确保函数有return语句 |
速查表
# ===== CLI常用命令 =====
prefect server start # 启动本地Server
prefect flow-run ls # 列出最近的Flow运行记录
prefect deployment ls # 列出所有部署
prefect deployment run name # 手动触发部署
prefect work-pool ls # 列出工作池
prefect work-pool create name # 创建工作池
prefect worker start -p pool # 启动Worker
prefect config view # 查看当前配置
# ===== 装饰器参数速查 =====
# @task(retries=3) # 失败重试3次
# @task(retry_delay_seconds=30) # 重试间隔30秒
# @task(timeout_seconds=300) # 超时5分钟
# @task(log_prints=True) # print自动记录日志
# @task(tags=["important"]) # 添加标签
# @flow(name="My Flow") # Flow名称
# @flow(retries=2) # Flow级别重试
同类工具对比
| 特性 | Prefect 3 | Airflow 3 | Dagster 1.13 |
|---|
| 写法 | Python装饰器 | DAG + Operator | Asset为中心 |
| 上手难度 | ★★(最低) | ★★★ | ★★★ |
| 本地开发体验 | 极好(直接python运行) | 一般(需启动多个服务) | 好 |
| 事件驱动 | 原生支持 | 有限支持 | 有限支持 |
| 事务回滚 | 3.0支持 | 不支持 | 不支持 |
| 社区规模 | 中(17k Stars) | 最大(39k Stars) | 中(12k Stars) |
| 适合谁 | 快速上手、中小团队 | 企业级大型团队 | 数据质量优先 |
面试建议:Prefect适合对比Airflow回答"你了解哪些编排工具"。Prefect的核心卖点是"零样板代码"——不需要配置文件,不需要DAG声明,加个装饰器就行。