跳转至

Hamilton — 基于函数依赖图的 Python 数据管道框架


一句话说明

Hamilton 把每个 Python 函数当作数据管道的一个节点,函数名即输出列名,参数名即输入依赖,自动推导执行顺序,让数据流一目了然且极易测试。


安装与配置

# pip 安装
pip install sf-hamilton              # 注意包名有 sf- 前缀

# 可视化支持
pip install sf-hamilton[visualization]   # 生成依赖图图片

# 验证
python -c "from hamilton import driver; print('OK')"

核心用法

核心思想:函数名=节点名

# 文件:features.py(特征计算模块)
import pandas as pd

# Hamilton 规则:函数名是它输出的列名,参数名是它依赖的列名

def raw_counts(data_path: str) -> pd.Series:
    """从文件读取原始计数(data_path 由外部传入)"""
    return pd.read_csv(data_path)["count"]

def total_reads(raw_counts: pd.Series) -> float:
    """计算总计数(依赖 raw_counts)"""
    return raw_counts.sum()

def relative_abundance(raw_counts: pd.Series, total_reads: float) -> pd.Series:
    """计算相对丰度(依赖 raw_counts 和 total_reads)"""
    return raw_counts / total_reads

def log_abundance(relative_abundance: pd.Series) -> pd.Series:
    """对数变换(依赖 relative_abundance)"""
    import numpy as np
    return np.log1p(relative_abundance)   # log(1+x)

构建并运行 DAG

# 文件:run.py(运行脚本)
from hamilton import driver
import features                           # 导入上面的模块

# 构建 Driver(相当于编译 DAG)
dr = driver.Builder().with_modules(features).build()

# 可视化依赖图(需要安装 visualization)
dr.display_all_functions("pipeline.png")  # 生成依赖图图片

# 运行,指定你要的输出
result = dr.execute(
    final_vars  = ["log_abundance"],       # 只要这个输出
    inputs      = {"data_path": "otu.csv"} # 传入外部参数
)
print(result["log_abundance"])             # pd.Series

实战案例

生信特征工程管道

# 文件:bioinf_features.py

import pandas as pd
import numpy as np

def otu_matrix(otu_path: str) -> pd.DataFrame:
    """读取 OTU 矩阵(参数 otu_path 由外部输入)"""
    return pd.read_csv(otu_path, index_col=0)

def filtered_matrix(otu_matrix: pd.DataFrame, min_reads: int = 1000) -> pd.DataFrame:
    """过滤低质量样本"""
    row_sums = otu_matrix.sum(axis=1)
    return otu_matrix[row_sums >= min_reads]

def normalized_matrix(filtered_matrix: pd.DataFrame) -> pd.DataFrame:
    """每行归一化(相对丰度)"""
    return filtered_matrix.div(filtered_matrix.sum(axis=1), axis=0)

def shannon_index(normalized_matrix: pd.DataFrame) -> pd.Series:
    """计算 Shannon 多样性指数"""
    m = normalized_matrix.replace(0, np.nan)
    entropy = -(m * np.log(m)).sum(axis=1)
    return entropy.rename("shannon")

def species_richness(filtered_matrix: pd.DataFrame) -> pd.Series:
    """物种丰富度(非零物种数)"""
    return (filtered_matrix > 0).sum(axis=1).rename("richness")
# 文件:run_pipeline.py
from hamilton import driver
import bioinf_features as bf

dr = driver.Builder().with_modules(bf).build()

# 同时要多个输出(Hamilton 自动去重计算)
result = dr.execute(
    final_vars = ["shannon_index", "species_richness"],
    inputs     = {
        "otu_path":  "data/otu.csv",
        "min_reads": 5000,               # 覆盖默认值
    }
)

import pandas as pd
summary = pd.DataFrame(result)
print(summary.describe())

常见报错与解决

报错原因解决
ValueError: ... not found in inputs缺少外部参数inputs={} 中补充参数
GraphCreationError: cycle detected函数间存在循环依赖检查参数名是否与函数名构成环
MissingDependencyError参数名找不到对应函数确保函数名与参数名完全匹配

速查表

操作代码
构建 Driverdriver.Builder().with_modules(mod).build()
执行管道dr.execute(final_vars=["f"], inputs={"p": v})
可视化dr.display_all_functions("graph.png")
函数依赖参数名即依赖名,函数名即输出名
默认参数def f(x: int, scale: float = 1.0)
多输出final_vars=["f1","f2","f3"]