跳转至

Prefect 数据编排

一句话概述:Prefect 是现代Python工作流编排框架,用装饰器就能把普通Python函数变成可观测、可重试、可调度的数据管道。

核心知识点表

概念白话解释
Flow工作流的容器,相当于Airflow的DAG,但写法更简单
TaskFlow里的一个步骤,加个@task装饰器就行
Deployment把Flow"部署"到服务器上,让它能被调度或远程触发
Work Pool工作池,决定任务在哪里运行(本地、Docker、K8s等)
Automation自动化规则,"当X发生时,做Y"(比如任务失败时发通知)
Artifact任务产生的"制品"(表格、图表等),可以在UI上查看
Transaction3.0新增的事务机制,一组任务要么全成功,要么全回滚

版本信息(2026年5月)

  • 最新版本:Prefect 3.7.0
  • Python支持:3.9 - 3.13
  • Prefect 3.0 相比2.0是重大升级,事件系统开源、性能提升98%

安装配置

安装Prefect

# 创建虚拟环境
conda create -n prefect python=3.12 -y  # 创建环境
conda activate prefect  # 激活

# 安装Prefect(只需一行)
pip install prefect  # 安装最新版

# 验证安装
prefect version  # 显示版本号

# 启动本地Prefect Server(自带UI)
prefect server start  # 启动后访问 http://127.0.0.1:4200

配置Prefect Cloud(可选)

# 如果用Prefect Cloud(托管版,免费额度够学习用)
prefect cloud login  # 登录Cloud
# 或者用API Key
prefect cloud login --key YOUR_API_KEY  # 用API密钥登录

基本使用

第一个Flow

# my_flow.py
from prefect import flow, task  # 导入核心装饰器

@task(retries=3, retry_delay_seconds=60)  # 失败自动重试3次,间隔60秒
def extract_data():
    """提取数据"""
    print("从数据库提取数据...")
    return {"users": 100, "orders": 500}  # 返回提取的数据

@task(log_prints=True)  # 自动把print输出记录到日志
def transform_data(raw_data: dict):
    """转换数据"""
    total = raw_data["users"] + raw_data["orders"]
    print(f"共 {total} 条记录")  # 这行会出现在Prefect日志中
    return {"total": total}

@task
def load_data(data: dict):
    """加载数据"""
    print(f"写入 {data['total']} 条记录到数据仓库")

@flow(name="ETL Pipeline", log_prints=True)  # 定义工作流
def etl_pipeline():
    """完整的ETL流程"""
    raw = extract_data()  # 步骤1:提取
    transformed = transform_data(raw)  # 步骤2:转换
    load_data(transformed)  # 步骤3:加载

# 直接运行就行,不需要额外配置
if __name__ == "__main__":
    etl_pipeline()  # 运行Flow
# 运行
python my_flow.py  # 像普通Python脚本一样运行

参数化Flow

@flow
def process_data(
    date: str,  # 处理哪天的数据
    batch_size: int = 1000,  # 每批处理多少条,默认1000
):
    """带参数的工作流"""
    print(f"处理 {date} 的数据,批次大小 {batch_size}")

# 调用时传参
process_data(date="2026-05-13", batch_size=500)

子Flow

@flow  # 子Flow也是用@flow装饰
def data_quality_check(data: dict):
    """数据质量检查(子Flow)"""
    assert data["total"] > 0, "数据不能为空"
    print("数据质量检查通过")
    return True

@flow
def main_pipeline():
    """主流程"""
    raw = extract_data()
    transformed = transform_data(raw)
    data_quality_check(transformed)  # 调用子Flow
    load_data(transformed)

高级用法

部署与调度

# deploy_flow.py
from my_flow import etl_pipeline  # 导入你的Flow

if __name__ == "__main__":
    # 方式1:serve模式(最简单,适合开发)
    etl_pipeline.serve(
        name="etl-daily",  # 部署名称
        cron="0 6 * * *",  # 每天早上6点运行
        tags=["production", "etl"],  # 标签
    )
# 方式2:CLI部署
prefect deploy my_flow.py:etl_pipeline \
  --name "etl-daily" \
  --cron "0 6 * * *" \
  --pool default-agent-pool

事务(3.0新特性)

from prefect import task, flow
from prefect.transactions import transaction  # 导入事务

@task
def create_order(item: str):
    """创建订单"""
    print(f"创建订单: {item}")
    return {"order_id": 123, "item": item}

@task
def charge_payment(order: dict):
    """扣款"""
    print(f"订单 {order['order_id']} 扣款成功")

@task
def rollback_order(order: dict):
    """回滚订单(扣款失败时调用)"""
    print(f"回滚订单 {order['order_id']}")

@flow
def purchase_flow():
    """购买流程:创建订单 → 扣款,失败则回滚"""
    with transaction():  # 事务块:里面的任务要么全成功,要么全回滚
        order = create_order("Python书")
        charge_payment(order)

并发执行

from prefect import flow, task
from prefect.futures import wait  # 导入等待工具

@task
def process_file(filename: str):
    """处理单个文件"""
    print(f"处理 {filename}")
    return f"{filename} 处理完成"

@flow
def parallel_processing():
    """并行处理多个文件"""
    files = ["a.csv", "b.csv", "c.csv", "d.csv"]

    # submit() 提交任务到线程池,并行执行
    futures = [process_file.submit(f) for f in files]  # 4个任务同时跑

    # 等待所有任务完成
    results = [f.result() for f in futures]  # 收集结果
    print(results)

缓存结果

from prefect import task
from prefect.cache_policies import INPUTS  # 基于输入的缓存策略
from datetime import timedelta

@task(
    cache_policy=INPUTS,  # 相同输入 → 直接返回缓存结果
    cache_expiration=timedelta(hours=1),  # 缓存有效期1小时
)
def expensive_computation(x: int):
    """耗时计算,结果会被缓存"""
    import time
    time.sleep(10)  # 模拟耗时操作
    return x * 2

# 第一次调用:等10秒
# 第二次相同参数调用:直接返回,不等待

常见报错与解决

报错信息原因解决方案
Connection refusedPrefect Server没启动运行 prefect server start
Flow run failed: timeout任务超时给task加 timeout_seconds 参数
ImportError: no module依赖没装确保在Prefect运行的环境中 pip install
API key is invalidCloud API密钥过期prefect cloud login 重新登录
Work pool not found工作池不存在prefect work-pool create my-pool
prefect.exceptions.MissingResult任务没返回结果确保函数有return语句

速查表

# ===== CLI常用命令 =====
prefect server start           # 启动本地Server
prefect flow-run ls            # 列出最近的Flow运行记录
prefect deployment ls          # 列出所有部署
prefect deployment run name    # 手动触发部署
prefect work-pool ls           # 列出工作池
prefect work-pool create name  # 创建工作池
prefect worker start -p pool   # 启动Worker
prefect config view            # 查看当前配置

# ===== 装饰器参数速查 =====
# @task(retries=3)                  # 失败重试3次
# @task(retry_delay_seconds=30)     # 重试间隔30秒
# @task(timeout_seconds=300)        # 超时5分钟
# @task(log_prints=True)            # print自动记录日志
# @task(tags=["important"])         # 添加标签
# @flow(name="My Flow")            # Flow名称
# @flow(retries=2)                  # Flow级别重试

同类工具对比

特性Prefect 3Airflow 3Dagster 1.13
写法Python装饰器DAG + OperatorAsset为中心
上手难度★★(最低)★★★★★★
本地开发体验极好(直接python运行)一般(需启动多个服务)
事件驱动原生支持有限支持有限支持
事务回滚3.0支持不支持不支持
社区规模中(17k Stars)最大(39k Stars)中(12k Stars)
适合谁快速上手、中小团队企业级大型团队数据质量优先

面试建议:Prefect适合对比Airflow回答"你了解哪些编排工具"。Prefect的核心卖点是"零样板代码"——不需要配置文件,不需要DAG声明,加个装饰器就行。