Hamilton 微流程完全指南¶
为什么要学 Hamilton¶
函数即节点,极度简洁:Hamilton 用 Python 函数的名字和参数自动构建 DAG。函数名就是数据节点名,参数名引用上游节点。不需要装饰器、不需要声明依赖、不需要配置文件。这是最简洁的数据管道定义方式。
类型驱动,自文档化:Hamilton 强制使用 Python 类型注解。类型注解既是文档也是验证机制。看一眼函数签名就知道输入输出类型,DAG 关系一目了然。
DAG 可视化:
dr.visualize_execution()一行代码生成 DAG 图,清晰展示数据流向。这在 ML 特征工程中尤其有价值——你可以直观看到每个特征的计算路径。轻量级,零框架锁定:Hamilton 不是 Airflow/Prefect 那种重量级编排框架。它只是一个小巧的 Python 库(~200KB),负责组织你的函数调用。可以在 Jupyter Notebook、脚本、微服务中使用,也可以嵌入 Airflow/Prefect 作为单个任务。
ML 特征工程的最佳拍档:Hamilton 诞生于 Stitch Fix 的 ML 团队,专门解决特征工程中的痛点:特征复用、版本管理、血缘追踪、测试。如果你做 ML 特征工程,Hamilton 是目前最优雅的解决方案。
核心概念详解¶
Hamilton 是什么(白话解释)¶
假设你在做一份数据分析:先加载原始数据,然后清洗、计算特征、汇总统计。每个步骤是一个函数。
普通做法:
raw = load_data()
cleaned = clean(raw)
feature_a = calc_feature_a(cleaned)
feature_b = calc_feature_b(cleaned)
result = summarize(feature_a, feature_b)
Hamilton 的做法:你只需要定义函数,Hamilton 自动理解它们之间的依赖关系(通过参数名匹配函数名),自动按正确顺序执行。
核心原则¶
| 原则 | 说明 |
|---|---|
| 函数名 = 节点名 | 函数名就是数据流图中的节点标识 |
| 参数名 = 依赖 | 函数参数名引用其他函数的输出 |
| 类型注解必须 | 每个函数必须有类型注解 |
| 无副作用 | 函数应该是纯函数(相同输入 → 相同输出) |
| 确定性 DAG | DAG 在定义时就完全确定 |
Hamilton 工作流程¶
步骤 1: 定义函数模块(functions.py)
↓
步骤 2: 创建 Driver
↓
步骤 3: Driver 扫描模块,构建 DAG
↓
步骤 4: 调用 dr.execute(),指定需要的输出
↓
步骤 5: Hamilton 自动执行必要的函数链
Hamilton vs Pandas pipe vs 手动调用对比¶
| 特性 | Hamilton | Pandas pipe | 手动函数调用 |
|---|---|---|---|
| 依赖管理 | 自动推导 | 无 | 手动 |
| DAG可视化 | 内置 | 无 | 无 |
| 复用性 | 高(模块化) | 低 | 中 |
| 可测试性 | 高(纯函数) | 中 | 看代码质量 |
| 文档化 | 类型注解自文档 | 无 | 需手动 |
| 血缘追踪 | 自动 | 无 | 手动 |
| 并行执行 | 支持 | 不支持 | 手动 |
| 增量执行 | 支持 | 不支持 | 手动 |
| 代码量 | 最少 | 中 | 最多 |
| 学习曲线 | 低(就是写函数) | 极低 | 极低 |
安装与配置¶
# 基本安装
pip install sf-hamilton
# 带可视化支持
pip install "sf-hamilton[visualization]"
# 带 Pandas 支持
pip install "sf-hamilton[pandas]"
# 带 Polars 支持
pip install "sf-hamilton[polars]"
# 带 PySpark 支持
pip install "sf-hamilton[pyspark]"
# 全部安装
pip install "sf-hamilton[visualization,pandas,polars]"
项目结构(推荐)¶
my_ml_project/
├── features/ # 特征定义模块
│ ├── __init__.py
│ ├── user_features.py # 用户相关特征
│ ├── order_features.py # 订单相关特征
│ └── text_features.py # 文本特征
├── transforms/ # 数据转换模块
│ ├── __init__.py
│ ├── cleaning.py
│ └── aggregation.py
├── pipelines/
│ ├── training.py # 训练 Pipeline Driver
│ └── inference.py # 推理 Pipeline Driver
├── tests/
│ ├── test_features.py
│ └── test_transforms.py
└── notebooks/
└── exploration.ipynb
快速上手:5 分钟最小示例¶
features.py(定义函数):
import pandas as pd
def raw_data(data_path: str) -> pd.DataFrame:
"""加载原始数据"""
return pd.read_csv(data_path)
def cleaned_data(raw_data: pd.DataFrame) -> pd.DataFrame:
"""清洗数据:去重、去空值"""
return raw_data.drop_duplicates().dropna()
def total_amount(cleaned_data: pd.DataFrame) -> pd.Series:
"""计算总金额"""
return cleaned_data["quantity"] * cleaned_data["unit_price"]
def avg_amount(total_amount: pd.Series) -> float:
"""计算平均金额"""
return total_amount.mean()
def high_value_flag(total_amount: pd.Series, threshold: float = 100.0) -> pd.Series:
"""标记高价值订单"""
return total_amount > threshold
def summary_stats(total_amount: pd.Series, high_value_flag: pd.Series) -> dict:
"""汇总统计"""
return {
"mean": total_amount.mean(),
"median": total_amount.median(),
"high_value_pct": high_value_flag.mean() * 100,
"count": len(total_amount),
}
run.py(Driver 执行):
from hamilton import driver
import features # 导入函数模块
# 创建 Driver
dr = driver.Builder().with_modules(features).build()
# 可视化 DAG
dr.display_all_functions("dag.png")
# 执行:指定需要的输出和输入参数
result = dr.execute(
final_vars=["summary_stats", "avg_amount"], # 需要的输出
inputs={"data_path": "orders.csv", "threshold": 200.0}, # 输入参数
)
print(result["summary_stats"])
print(f"平均金额: {result['avg_amount']:.2f}")
运行:
DAG 自动推导结果:
data_path ─→ raw_data ─→ cleaned_data ─→ total_amount ─→ avg_amount
↓ ↓
threshold ─→ high_value_flag │
↓ │
summary_stats ←─────┘
进阶用法¶
场景一:ML 特征工程¶
# user_features.py
import pandas as pd
import numpy as np
def user_age(birth_date: pd.Series) -> pd.Series:
"""用户年龄"""
return (pd.Timestamp.now() - pd.to_datetime(birth_date)).dt.days / 365.25
def user_tenure_days(signup_date: pd.Series) -> pd.Series:
"""注册天数"""
return (pd.Timestamp.now() - pd.to_datetime(signup_date)).dt.days
def user_is_new(user_tenure_days: pd.Series, new_user_threshold: int = 30) -> pd.Series:
"""是否新用户"""
return user_tenure_days < new_user_threshold
def total_orders(order_count: pd.Series) -> pd.Series:
"""总订单数(直接透传,命名规范化)"""
return order_count
def avg_order_value(total_spend: pd.Series, total_orders: pd.Series) -> pd.Series:
"""平均订单价值"""
return total_spend / total_orders.clip(lower=1)
def user_segment(avg_order_value: pd.Series, total_orders: pd.Series) -> pd.Series:
"""用户分层"""
conditions = [
(avg_order_value > 500) & (total_orders > 10),
(avg_order_value > 200) | (total_orders > 5),
]
choices = ["VIP", "常客"]
return pd.Series(np.select(conditions, choices, default="普通"), name="user_segment")
def log_total_spend(total_spend: pd.Series) -> pd.Series:
"""总消费对数变换"""
return np.log1p(total_spend)
def days_since_last_order(last_order_date: pd.Series) -> pd.Series:
"""距上次下单天数"""
return (pd.Timestamp.now() - pd.to_datetime(last_order_date)).dt.days
# training_pipeline.py
from hamilton import driver
import user_features
dr = driver.Builder().with_modules(user_features).build()
# 输入是已有的原始列
raw_df = pd.read_parquet("users.parquet")
inputs = {col: raw_df[col] for col in raw_df.columns}
inputs["new_user_threshold"] = 90
# 指定需要的特征列
feature_columns = [
"user_age", "user_tenure_days", "user_is_new",
"avg_order_value", "user_segment",
"log_total_spend", "days_since_last_order",
]
result = dr.execute(final_vars=feature_columns, inputs=inputs)
# 组合成特征矩阵
feature_df = pd.DataFrame(result)
场景二:装饰器增强¶
from hamilton.function_modifiers import (
extract_columns, parameterize, check_output, tag
)
import pandas as pd
import pandera as pa
# extract_columns:从 DataFrame 提取多列
@extract_columns("user_id", "name", "email", "signup_date")
def raw_users(users_path: str) -> pd.DataFrame:
return pd.read_csv(users_path)
# parameterize:生成多个相似的函数
@parameterize(
total_sales_7d={"window_days": 7},
total_sales_30d={"window_days": 30},
total_sales_90d={"window_days": 90},
)
def total_sales_Xd(order_dates: pd.Series, amounts: pd.Series, window_days: int) -> pd.Series:
cutoff = pd.Timestamp.now() - pd.Timedelta(days=window_days)
mask = pd.to_datetime(order_dates) >= cutoff
return amounts.where(mask, 0)
# check_output:输出验证
@check_output(
schema=pa.SeriesSchema(
float,
checks=[
pa.Check.greater_than(0),
pa.Check.less_than(1_000_000),
]
)
)
def revenue(quantity: pd.Series, unit_price: pd.Series) -> pd.Series:
return quantity * unit_price
# tag:为函数添加标签
@tag(owner="data-team", pii="false", importance="high")
def critical_metric(revenue: pd.Series) -> float:
return revenue.sum()
场景三:多后端支持(Pandas → Polars → Spark)¶
# 同一份逻辑,不同后端
# pandas_features.py
import pandas as pd
def revenue(quantity: pd.Series, price: pd.Series) -> pd.Series:
return quantity * price
# polars_features.py
import polars as pl
def revenue(quantity: pl.Series, price: pl.Series) -> pl.Series:
return quantity * price
# 使用不同的模块
import pandas_features
import polars_features
# Pandas 版本
dr_pandas = driver.Builder().with_modules(pandas_features).build()
# Polars 版本
dr_polars = driver.Builder().with_modules(polars_features).build()
场景四:DAG 可视化¶
from hamilton import driver
import features
dr = driver.Builder().with_modules(features).build()
# 可视化所有函数
dr.display_all_functions("full_dag.png")
# 可视化特定输出的执行路径
dr.visualize_execution(
final_vars=["summary_stats"],
inputs={"data_path": "test.csv"},
output_file_path="execution_dag.png",
)
# 只看上游依赖
dr.visualize_execution(
final_vars=["avg_order_value"],
inputs={...},
output_file_path="feature_lineage.png",
)
场景五:与 Airflow/Prefect 集成¶
# Hamilton 作为 Airflow Task 内部的数据管道
# airflow_dag.py
from airflow.decorators import task, dag
from hamilton import driver
import features
@dag(schedule="0 2 * * *", start_date=datetime(2024, 1, 1))
def ml_pipeline():
@task
def compute_features():
dr = driver.Builder().with_modules(features).build()
result = dr.execute(
final_vars=["feature_matrix"],
inputs={"date": "{{ ds }}"},
)
result["feature_matrix"].to_parquet(f"features/{{ ds }}.parquet")
@task
def train_model():
# 训练逻辑
...
compute_features() >> train_model()
场景六:GraphAdapter(自定义执行)¶
from hamilton import driver, base
# 使用 Dask 并行执行
from hamilton.plugins import h_dask
dr = (
driver.Builder()
.with_modules(features)
.with_adapters(h_dask.DaskGraphAdapter(
result_builder=base.PandasDataFrameResult()
))
.build()
)
# 使用 Ray 分布式执行
from hamilton.plugins import h_ray
dr = (
driver.Builder()
.with_modules(features)
.with_adapters(h_ray.RayGraphAdapter(
result_builder=base.PandasDataFrameResult()
))
.build()
)
常见问题与排错¶
问题一:函数名冲突¶
# 错误:两个模块有同名函数
# module_a.py: def revenue(...) -> pd.Series
# module_b.py: def revenue(...) -> pd.Series
# 解决:使用不同的模块或重命名
# 或使用 @config.when 条件加载
from hamilton.function_modifiers import config
@config.when(version="v1")
def revenue__v1(quantity: pd.Series, price: pd.Series) -> pd.Series:
return quantity * price
@config.when(version="v2")
def revenue__v2(quantity: pd.Series, price: pd.Series, discount: pd.Series) -> pd.Series:
return quantity * price * (1 - discount)
问题二:类型注解报错¶
Hamilton 要求所有函数必须有类型注解。
问题三:如何传入外部参数¶
# 使用 inputs 参数传入
result = dr.execute(
final_vars=["output"],
inputs={
"threshold": 100, # 标量参数
"raw_column": df["column"], # Series 参数
"config_path": "/path", # 字符串参数
},
)
问题四:如何处理可选依赖¶
from hamilton.function_modifiers import source, value
import typing
def feature_with_default(
base_value: pd.Series,
multiplier: float = 1.0, # 默认值
) -> pd.Series:
return base_value * multiplier
问题五:DAG 图片生成失败¶
# 需要安装 graphviz
# macOS
brew install graphviz
# Ubuntu
sudo apt-get install graphviz
# pip
pip install graphviz
参考资源¶
- 官方文档:https://hamilton.dagworks.io/
- GitHub:https://github.com/DAGWorks-Inc/hamilton
- 教程:https://hamilton.dagworks.io/en/latest/tutorials/
- DAGWorks Blog:https://blog.dagworks.io/
- 论文:《Hamilton: A Micro-framework for Creating Dataflows》
- Slack 社区:https://hamilton-opensource.slack.com/
- 示例代码:https://github.com/DAGWorks-Inc/hamilton/tree/main/examples
- Stitch Fix 技术博客:https://multithreaded.stitchfix.com/