Metaflow ML 工作流¶
为什么要学¶
Metaflow 是 Netflix 开源的 ML 工作流框架,让数据科学家用 Python 就能管理从原型到生产的全流程:
- Python 原生:用装饰器定义工作流步骤,无需学新语言
- 无缝扩展:本地开发,一行代码切换到 AWS/云端执行
- 数据版本化:每次运行的数据和代码自动版本化
- 并行执行:内置 foreach 并行,自动管理
- 容错恢复:失败后从断点继续,无需重新开始
- Netflix 验证:Netflix ML 团队日常使用
适合 ML 实验管理、模型训练管道、数据处理 Pipeline。
核心概念¶
| 概念 | 说明 | 类比 |
|---|---|---|
| Flow | 完整工作流 | 程序/Pipeline |
| Step | 工作流中的一个步骤 | 函数 |
| Artifact | 步骤间传递的数据 | 变量(自动持久化) |
| Run | 一次工作流执行 | 一次实验 |
| foreach | 动态并行分支 | map/并行 |
| @retry | 自动重试 | 容错 |
| @resources | 计算资源声明 | 资源申请 |
| @card | 可视化报告 | 实验报告 |
安装配置¶
快速上手¶
from metaflow import FlowSpec, step, card, Parameter
class TrainingFlow(FlowSpec):
"""ML模型训练工作流"""
learning_rate = Parameter("lr", default=0.01, help="学习率")
@step
def start(self):
"""加载数据"""
import pandas as pd
self.data = pd.read_csv("train.csv")
print(f"数据量: {len(self.data)}")
self.next(self.preprocess)
@step
def preprocess(self):
"""数据预处理"""
from sklearn.model_selection import train_test_split
X = self.data.drop("target", axis=1)
y = self.data["target"]
self.X_train, self.X_test, self.y_train, self.y_test = \
train_test_split(X, y, test_size=0.2)
self.next(self.train)
@card
@step
def train(self):
"""训练模型"""
from sklearn.ensemble import RandomForestClassifier
self.model = RandomForestClassifier(n_estimators=100)
self.model.fit(self.X_train, self.y_train)
self.accuracy = self.model.score(self.X_test, self.y_test)
print(f"准确率: {self.accuracy:.4f}")
self.next(self.end)
@step
def end(self):
"""结束"""
print(f"最终准确率: {self.accuracy:.4f}")
if __name__ == "__main__":
TrainingFlow()
并行超参搜索¶
class HPSearchFlow(FlowSpec):
@step
def start(self):
self.hyperparams = [
{"lr": 0.001, "depth": 3},
{"lr": 0.01, "depth": 5},
{"lr": 0.1, "depth": 7},
]
self.next(self.train, foreach="hyperparams")
@step
def train(self):
"""并行训练多个配置"""
hp = self.input
# 训练模型...
self.score = train_model(hp)
self.hp = hp
self.next(self.join)
@step
def join(self, inputs):
"""汇总结果,选最佳"""
best = max(inputs, key=lambda x: x.score)
self.best_hp = best.hp
self.best_score = best.score
print(f"最佳配置: {self.best_hp}, 分数: {self.best_score}")
self.next(self.end)
@step
def end(self):
pass
进阶用法¶
云端扩展¶
from metaflow import resources, batch, retry
class BigDataFlow(FlowSpec):
@resources(cpu=4, memory=16000) # 16GB内存
@retry(times=2)
@step
def heavy_processing(self):
# 在大机器上运行
...
@batch(cpu=8, memory=32000, gpu=1) # GPU训练
@step
def train_gpu(self):
# 在AWS Batch GPU实例运行
...
访问历史运行¶
from metaflow import Flow
# 获取最近的运行
flow = Flow("TrainingFlow")
latest_run = flow.latest_run
print(f"准确率: {latest_run.data.accuracy}")
# 对比多次运行
for run in flow.runs():
print(f"Run {run.id}: accuracy={run.data.accuracy}")
常见问题¶
Q1: vs MLflow?¶
| 方面 | Metaflow | MLflow |
|---|---|---|
| 核心 | 工作流编排 | 实验追踪 |
| DAG定义 | Python类+装饰器 | (无DAG) |
| 数据版本 | 自动(artifact) | 手动log |
| 并行 | 原生foreach | 需要额外工具 |
| 生产部署 | 内置(AWS Step Functions) | 需要额外配置 |
| 互补性 | 可以和MLflow一起用 | 可以和Metaflow一起用 |
参考资源¶
- Metaflow 官网 - 官方网站
- Metaflow 文档 - 完整文档
- Metaflow GitHub - 源代码
- Tutorials - 官方教程