跳转至

Metaflow ML 工作流

为什么要学

Metaflow 是 Netflix 开源的 ML 工作流框架,让数据科学家用 Python 就能管理从原型到生产的全流程:

  • Python 原生:用装饰器定义工作流步骤,无需学新语言
  • 无缝扩展:本地开发,一行代码切换到 AWS/云端执行
  • 数据版本化:每次运行的数据和代码自动版本化
  • 并行执行:内置 foreach 并行,自动管理
  • 容错恢复:失败后从断点继续,无需重新开始
  • Netflix 验证:Netflix ML 团队日常使用

适合 ML 实验管理、模型训练管道、数据处理 Pipeline。

核心概念

概念说明类比
Flow完整工作流程序/Pipeline
Step工作流中的一个步骤函数
Artifact步骤间传递的数据变量(自动持久化)
Run一次工作流执行一次实验
foreach动态并行分支map/并行
@retry自动重试容错
@resources计算资源声明资源申请
@card可视化报告实验报告

安装配置

pip install metaflow

# AWS集成(可选)
pip install metaflow[aws]

快速上手

from metaflow import FlowSpec, step, card, Parameter

class TrainingFlow(FlowSpec):
    """ML模型训练工作流"""

    learning_rate = Parameter("lr", default=0.01, help="学习率")

    @step
    def start(self):
        """加载数据"""
        import pandas as pd
        self.data = pd.read_csv("train.csv")
        print(f"数据量: {len(self.data)}")
        self.next(self.preprocess)

    @step
    def preprocess(self):
        """数据预处理"""
        from sklearn.model_selection import train_test_split
        X = self.data.drop("target", axis=1)
        y = self.data["target"]
        self.X_train, self.X_test, self.y_train, self.y_test = \
            train_test_split(X, y, test_size=0.2)
        self.next(self.train)

    @card
    @step
    def train(self):
        """训练模型"""
        from sklearn.ensemble import RandomForestClassifier
        self.model = RandomForestClassifier(n_estimators=100)
        self.model.fit(self.X_train, self.y_train)
        self.accuracy = self.model.score(self.X_test, self.y_test)
        print(f"准确率: {self.accuracy:.4f}")
        self.next(self.end)

    @step
    def end(self):
        """结束"""
        print(f"最终准确率: {self.accuracy:.4f}")

if __name__ == "__main__":
    TrainingFlow()
# 运行
python train_flow.py run --lr 0.001

# 查看历史
python train_flow.py show

并行超参搜索

class HPSearchFlow(FlowSpec):

    @step
    def start(self):
        self.hyperparams = [
            {"lr": 0.001, "depth": 3},
            {"lr": 0.01, "depth": 5},
            {"lr": 0.1, "depth": 7},
        ]
        self.next(self.train, foreach="hyperparams")

    @step
    def train(self):
        """并行训练多个配置"""
        hp = self.input
        # 训练模型...
        self.score = train_model(hp)
        self.hp = hp
        self.next(self.join)

    @step
    def join(self, inputs):
        """汇总结果,选最佳"""
        best = max(inputs, key=lambda x: x.score)
        self.best_hp = best.hp
        self.best_score = best.score
        print(f"最佳配置: {self.best_hp}, 分数: {self.best_score}")
        self.next(self.end)

    @step
    def end(self):
        pass

进阶用法

云端扩展

from metaflow import resources, batch, retry

class BigDataFlow(FlowSpec):

    @resources(cpu=4, memory=16000)  # 16GB内存
    @retry(times=2)
    @step
    def heavy_processing(self):
        # 在大机器上运行
        ...

    @batch(cpu=8, memory=32000, gpu=1)  # GPU训练
    @step
    def train_gpu(self):
        # 在AWS Batch GPU实例运行
        ...

访问历史运行

from metaflow import Flow

# 获取最近的运行
flow = Flow("TrainingFlow")
latest_run = flow.latest_run

print(f"准确率: {latest_run.data.accuracy}")

# 对比多次运行
for run in flow.runs():
    print(f"Run {run.id}: accuracy={run.data.accuracy}")

常见问题

Q1: vs MLflow?

方面MetaflowMLflow
核心工作流编排实验追踪
DAG定义Python类+装饰器(无DAG)
数据版本自动(artifact)手动log
并行原生foreach需要额外工具
生产部署内置(AWS Step Functions)需要额外配置
互补性可以和MLflow一起用可以和Metaflow一起用

参考资源