跳转至

Hamilton 微流程完全指南

为什么要学 Hamilton

  1. 函数即节点,极度简洁:Hamilton 用 Python 函数的名字和参数自动构建 DAG。函数名就是数据节点名,参数名引用上游节点。不需要装饰器、不需要声明依赖、不需要配置文件。这是最简洁的数据管道定义方式。

  2. 类型驱动,自文档化:Hamilton 强制使用 Python 类型注解。类型注解既是文档也是验证机制。看一眼函数签名就知道输入输出类型,DAG 关系一目了然。

  3. DAG 可视化dr.visualize_execution() 一行代码生成 DAG 图,清晰展示数据流向。这在 ML 特征工程中尤其有价值——你可以直观看到每个特征的计算路径。

  4. 轻量级,零框架锁定:Hamilton 不是 Airflow/Prefect 那种重量级编排框架。它只是一个小巧的 Python 库(~200KB),负责组织你的函数调用。可以在 Jupyter Notebook、脚本、微服务中使用,也可以嵌入 Airflow/Prefect 作为单个任务。

  5. ML 特征工程的最佳拍档:Hamilton 诞生于 Stitch Fix 的 ML 团队,专门解决特征工程中的痛点:特征复用、版本管理、血缘追踪、测试。如果你做 ML 特征工程,Hamilton 是目前最优雅的解决方案。


核心概念详解

Hamilton 是什么(白话解释)

假设你在做一份数据分析:先加载原始数据,然后清洗、计算特征、汇总统计。每个步骤是一个函数。

普通做法:

raw = load_data()
cleaned = clean(raw)
feature_a = calc_feature_a(cleaned)
feature_b = calc_feature_b(cleaned)
result = summarize(feature_a, feature_b)

Hamilton 的做法:你只需要定义函数,Hamilton 自动理解它们之间的依赖关系(通过参数名匹配函数名),自动按正确顺序执行。

核心原则

原则说明
函数名 = 节点名函数名就是数据流图中的节点标识
参数名 = 依赖函数参数名引用其他函数的输出
类型注解必须每个函数必须有类型注解
无副作用函数应该是纯函数(相同输入 → 相同输出)
确定性 DAGDAG 在定义时就完全确定

Hamilton 工作流程

步骤 1: 定义函数模块(functions.py)
步骤 2: 创建 Driver
步骤 3: Driver 扫描模块,构建 DAG
步骤 4: 调用 dr.execute(),指定需要的输出
步骤 5: Hamilton 自动执行必要的函数链

Hamilton vs Pandas pipe vs 手动调用对比

特性HamiltonPandas pipe手动函数调用
依赖管理自动推导手动
DAG可视化内置
复用性高(模块化)
可测试性高(纯函数)看代码质量
文档化类型注解自文档需手动
血缘追踪自动手动
并行执行支持不支持手动
增量执行支持不支持手动
代码量最少最多
学习曲线低(就是写函数)极低极低

安装与配置

# 基本安装
pip install sf-hamilton

# 带可视化支持
pip install "sf-hamilton[visualization]"

# 带 Pandas 支持
pip install "sf-hamilton[pandas]"

# 带 Polars 支持
pip install "sf-hamilton[polars]"

# 带 PySpark 支持
pip install "sf-hamilton[pyspark]"

# 全部安装
pip install "sf-hamilton[visualization,pandas,polars]"

项目结构(推荐)

my_ml_project/
├── features/              # 特征定义模块
│   ├── __init__.py
│   ├── user_features.py   # 用户相关特征
│   ├── order_features.py  # 订单相关特征
│   └── text_features.py   # 文本特征
├── transforms/            # 数据转换模块
│   ├── __init__.py
│   ├── cleaning.py
│   └── aggregation.py
├── pipelines/
│   ├── training.py        # 训练 Pipeline Driver
│   └── inference.py       # 推理 Pipeline Driver
├── tests/
│   ├── test_features.py
│   └── test_transforms.py
└── notebooks/
    └── exploration.ipynb

快速上手:5 分钟最小示例

features.py(定义函数):

import pandas as pd

def raw_data(data_path: str) -> pd.DataFrame:
    """加载原始数据"""
    return pd.read_csv(data_path)

def cleaned_data(raw_data: pd.DataFrame) -> pd.DataFrame:
    """清洗数据:去重、去空值"""
    return raw_data.drop_duplicates().dropna()

def total_amount(cleaned_data: pd.DataFrame) -> pd.Series:
    """计算总金额"""
    return cleaned_data["quantity"] * cleaned_data["unit_price"]

def avg_amount(total_amount: pd.Series) -> float:
    """计算平均金额"""
    return total_amount.mean()

def high_value_flag(total_amount: pd.Series, threshold: float = 100.0) -> pd.Series:
    """标记高价值订单"""
    return total_amount > threshold

def summary_stats(total_amount: pd.Series, high_value_flag: pd.Series) -> dict:
    """汇总统计"""
    return {
        "mean": total_amount.mean(),
        "median": total_amount.median(),
        "high_value_pct": high_value_flag.mean() * 100,
        "count": len(total_amount),
    }

run.py(Driver 执行):

from hamilton import driver
import features  # 导入函数模块

# 创建 Driver
dr = driver.Builder().with_modules(features).build()

# 可视化 DAG
dr.display_all_functions("dag.png")

# 执行:指定需要的输出和输入参数
result = dr.execute(
    final_vars=["summary_stats", "avg_amount"],  # 需要的输出
    inputs={"data_path": "orders.csv", "threshold": 200.0},  # 输入参数
)

print(result["summary_stats"])
print(f"平均金额: {result['avg_amount']:.2f}")

运行:

python run.py

DAG 自动推导结果:

data_path ─→ raw_data ─→ cleaned_data ─→ total_amount ─→ avg_amount
                                              ↓              ↓
                              threshold ─→ high_value_flag   │
                                              ↓              │
                                         summary_stats ←─────┘


进阶用法

场景一:ML 特征工程

# user_features.py
import pandas as pd
import numpy as np

def user_age(birth_date: pd.Series) -> pd.Series:
    """用户年龄"""
    return (pd.Timestamp.now() - pd.to_datetime(birth_date)).dt.days / 365.25

def user_tenure_days(signup_date: pd.Series) -> pd.Series:
    """注册天数"""
    return (pd.Timestamp.now() - pd.to_datetime(signup_date)).dt.days

def user_is_new(user_tenure_days: pd.Series, new_user_threshold: int = 30) -> pd.Series:
    """是否新用户"""
    return user_tenure_days < new_user_threshold

def total_orders(order_count: pd.Series) -> pd.Series:
    """总订单数(直接透传,命名规范化)"""
    return order_count

def avg_order_value(total_spend: pd.Series, total_orders: pd.Series) -> pd.Series:
    """平均订单价值"""
    return total_spend / total_orders.clip(lower=1)

def user_segment(avg_order_value: pd.Series, total_orders: pd.Series) -> pd.Series:
    """用户分层"""
    conditions = [
        (avg_order_value > 500) & (total_orders > 10),
        (avg_order_value > 200) | (total_orders > 5),
    ]
    choices = ["VIP", "常客"]
    return pd.Series(np.select(conditions, choices, default="普通"), name="user_segment")

def log_total_spend(total_spend: pd.Series) -> pd.Series:
    """总消费对数变换"""
    return np.log1p(total_spend)

def days_since_last_order(last_order_date: pd.Series) -> pd.Series:
    """距上次下单天数"""
    return (pd.Timestamp.now() - pd.to_datetime(last_order_date)).dt.days
# training_pipeline.py
from hamilton import driver
import user_features

dr = driver.Builder().with_modules(user_features).build()

# 输入是已有的原始列
raw_df = pd.read_parquet("users.parquet")
inputs = {col: raw_df[col] for col in raw_df.columns}
inputs["new_user_threshold"] = 90

# 指定需要的特征列
feature_columns = [
    "user_age", "user_tenure_days", "user_is_new",
    "avg_order_value", "user_segment",
    "log_total_spend", "days_since_last_order",
]

result = dr.execute(final_vars=feature_columns, inputs=inputs)

# 组合成特征矩阵
feature_df = pd.DataFrame(result)

场景二:装饰器增强

from hamilton.function_modifiers import (
    extract_columns, parameterize, check_output, tag
)
import pandas as pd
import pandera as pa

# extract_columns:从 DataFrame 提取多列
@extract_columns("user_id", "name", "email", "signup_date")
def raw_users(users_path: str) -> pd.DataFrame:
    return pd.read_csv(users_path)

# parameterize:生成多个相似的函数
@parameterize(
    total_sales_7d={"window_days": 7},
    total_sales_30d={"window_days": 30},
    total_sales_90d={"window_days": 90},
)
def total_sales_Xd(order_dates: pd.Series, amounts: pd.Series, window_days: int) -> pd.Series:
    cutoff = pd.Timestamp.now() - pd.Timedelta(days=window_days)
    mask = pd.to_datetime(order_dates) >= cutoff
    return amounts.where(mask, 0)

# check_output:输出验证
@check_output(
    schema=pa.SeriesSchema(
        float,
        checks=[
            pa.Check.greater_than(0),
            pa.Check.less_than(1_000_000),
        ]
    )
)
def revenue(quantity: pd.Series, unit_price: pd.Series) -> pd.Series:
    return quantity * unit_price

# tag:为函数添加标签
@tag(owner="data-team", pii="false", importance="high")
def critical_metric(revenue: pd.Series) -> float:
    return revenue.sum()

场景三:多后端支持(Pandas → Polars → Spark)

# 同一份逻辑,不同后端
# pandas_features.py
import pandas as pd

def revenue(quantity: pd.Series, price: pd.Series) -> pd.Series:
    return quantity * price

# polars_features.py
import polars as pl

def revenue(quantity: pl.Series, price: pl.Series) -> pl.Series:
    return quantity * price

# 使用不同的模块
import pandas_features
import polars_features

# Pandas 版本
dr_pandas = driver.Builder().with_modules(pandas_features).build()

# Polars 版本
dr_polars = driver.Builder().with_modules(polars_features).build()

场景四:DAG 可视化

from hamilton import driver
import features

dr = driver.Builder().with_modules(features).build()

# 可视化所有函数
dr.display_all_functions("full_dag.png")

# 可视化特定输出的执行路径
dr.visualize_execution(
    final_vars=["summary_stats"],
    inputs={"data_path": "test.csv"},
    output_file_path="execution_dag.png",
)

# 只看上游依赖
dr.visualize_execution(
    final_vars=["avg_order_value"],
    inputs={...},
    output_file_path="feature_lineage.png",
)

场景五:与 Airflow/Prefect 集成

# Hamilton 作为 Airflow Task 内部的数据管道
# airflow_dag.py
from airflow.decorators import task, dag
from hamilton import driver
import features

@dag(schedule="0 2 * * *", start_date=datetime(2024, 1, 1))
def ml_pipeline():

    @task
    def compute_features():
        dr = driver.Builder().with_modules(features).build()
        result = dr.execute(
            final_vars=["feature_matrix"],
            inputs={"date": "{{ ds }}"},
        )
        result["feature_matrix"].to_parquet(f"features/{{ ds }}.parquet")

    @task
    def train_model():
        # 训练逻辑
        ...

    compute_features() >> train_model()

场景六:GraphAdapter(自定义执行)

from hamilton import driver, base

# 使用 Dask 并行执行
from hamilton.plugins import h_dask

dr = (
    driver.Builder()
    .with_modules(features)
    .with_adapters(h_dask.DaskGraphAdapter(
        result_builder=base.PandasDataFrameResult()
    ))
    .build()
)

# 使用 Ray 分布式执行
from hamilton.plugins import h_ray

dr = (
    driver.Builder()
    .with_modules(features)
    .with_adapters(h_ray.RayGraphAdapter(
        result_builder=base.PandasDataFrameResult()
    ))
    .build()
)

常见问题与排错

问题一:函数名冲突

# 错误:两个模块有同名函数
# module_a.py:  def revenue(...) -> pd.Series
# module_b.py:  def revenue(...) -> pd.Series

# 解决:使用不同的模块或重命名
# 或使用 @config.when 条件加载
from hamilton.function_modifiers import config

@config.when(version="v1")
def revenue__v1(quantity: pd.Series, price: pd.Series) -> pd.Series:
    return quantity * price

@config.when(version="v2")
def revenue__v2(quantity: pd.Series, price: pd.Series, discount: pd.Series) -> pd.Series:
    return quantity * price * (1 - discount)

问题二:类型注解报错

Hamilton 要求所有函数必须有类型注解。

# 错误:缺少类型注解
def my_func(x):
    return x * 2

# 正确
def my_func(x: pd.Series) -> pd.Series:
    return x * 2

问题三:如何传入外部参数

# 使用 inputs 参数传入
result = dr.execute(
    final_vars=["output"],
    inputs={
        "threshold": 100,           # 标量参数
        "raw_column": df["column"], # Series 参数
        "config_path": "/path",     # 字符串参数
    },
)

问题四:如何处理可选依赖

from hamilton.function_modifiers import source, value
import typing

def feature_with_default(
    base_value: pd.Series,
    multiplier: float = 1.0,  # 默认值
) -> pd.Series:
    return base_value * multiplier

问题五:DAG 图片生成失败

# 需要安装 graphviz
# macOS
brew install graphviz

# Ubuntu
sudo apt-get install graphviz

# pip
pip install graphviz

参考资源

  • 官方文档:https://hamilton.dagworks.io/
  • GitHub:https://github.com/DAGWorks-Inc/hamilton
  • 教程:https://hamilton.dagworks.io/en/latest/tutorials/
  • DAGWorks Blog:https://blog.dagworks.io/
  • 论文:《Hamilton: A Micro-framework for Creating Dataflows》
  • Slack 社区:https://hamilton-opensource.slack.com/
  • 示例代码:https://github.com/DAGWorks-Inc/hamilton/tree/main/examples
  • Stitch Fix 技术博客:https://multithreaded.stitchfix.com/