Hamilton — 基于函数依赖图的 Python 数据管道框架
一句话说明
Hamilton 把每个 Python 函数当作数据管道的一个节点,函数名即输出列名,参数名即输入依赖,自动推导执行顺序,让数据流一目了然且极易测试。
安装与配置
# pip 安装
pip install sf-hamilton # 注意包名有 sf- 前缀
# 可视化支持
pip install sf-hamilton[visualization] # 生成依赖图图片
# 验证
python -c "from hamilton import driver; print('OK')"
核心用法
核心思想:函数名=节点名
# 文件:features.py(特征计算模块)
import pandas as pd
# Hamilton 规则:函数名是它输出的列名,参数名是它依赖的列名
def raw_counts(data_path: str) -> pd.Series:
"""从文件读取原始计数(data_path 由外部传入)"""
return pd.read_csv(data_path)["count"]
def total_reads(raw_counts: pd.Series) -> float:
"""计算总计数(依赖 raw_counts)"""
return raw_counts.sum()
def relative_abundance(raw_counts: pd.Series, total_reads: float) -> pd.Series:
"""计算相对丰度(依赖 raw_counts 和 total_reads)"""
return raw_counts / total_reads
def log_abundance(relative_abundance: pd.Series) -> pd.Series:
"""对数变换(依赖 relative_abundance)"""
import numpy as np
return np.log1p(relative_abundance) # log(1+x)
构建并运行 DAG
# 文件:run.py(运行脚本)
from hamilton import driver
import features # 导入上面的模块
# 构建 Driver(相当于编译 DAG)
dr = driver.Builder().with_modules(features).build()
# 可视化依赖图(需要安装 visualization)
dr.display_all_functions("pipeline.png") # 生成依赖图图片
# 运行,指定你要的输出
result = dr.execute(
final_vars = ["log_abundance"], # 只要这个输出
inputs = {"data_path": "otu.csv"} # 传入外部参数
)
print(result["log_abundance"]) # pd.Series
实战案例
生信特征工程管道
# 文件:bioinf_features.py
import pandas as pd
import numpy as np
def otu_matrix(otu_path: str) -> pd.DataFrame:
"""读取 OTU 矩阵(参数 otu_path 由外部输入)"""
return pd.read_csv(otu_path, index_col=0)
def filtered_matrix(otu_matrix: pd.DataFrame, min_reads: int = 1000) -> pd.DataFrame:
"""过滤低质量样本"""
row_sums = otu_matrix.sum(axis=1)
return otu_matrix[row_sums >= min_reads]
def normalized_matrix(filtered_matrix: pd.DataFrame) -> pd.DataFrame:
"""每行归一化(相对丰度)"""
return filtered_matrix.div(filtered_matrix.sum(axis=1), axis=0)
def shannon_index(normalized_matrix: pd.DataFrame) -> pd.Series:
"""计算 Shannon 多样性指数"""
m = normalized_matrix.replace(0, np.nan)
entropy = -(m * np.log(m)).sum(axis=1)
return entropy.rename("shannon")
def species_richness(filtered_matrix: pd.DataFrame) -> pd.Series:
"""物种丰富度(非零物种数)"""
return (filtered_matrix > 0).sum(axis=1).rename("richness")
# 文件:run_pipeline.py
from hamilton import driver
import bioinf_features as bf
dr = driver.Builder().with_modules(bf).build()
# 同时要多个输出(Hamilton 自动去重计算)
result = dr.execute(
final_vars = ["shannon_index", "species_richness"],
inputs = {
"otu_path": "data/otu.csv",
"min_reads": 5000, # 覆盖默认值
}
)
import pandas as pd
summary = pd.DataFrame(result)
print(summary.describe())
常见报错与解决
| 报错 | 原因 | 解决 |
|---|
ValueError: ... not found in inputs | 缺少外部参数 | 在 inputs={} 中补充参数 |
GraphCreationError: cycle detected | 函数间存在循环依赖 | 检查参数名是否与函数名构成环 |
MissingDependencyError | 参数名找不到对应函数 | 确保函数名与参数名完全匹配 |
速查表
| 操作 | 代码 |
|---|
| 构建 Driver | driver.Builder().with_modules(mod).build() |
| 执行管道 | dr.execute(final_vars=["f"], inputs={"p": v}) |
| 可视化 | dr.display_all_functions("graph.png") |
| 函数依赖 | 参数名即依赖名,函数名即输出名 |
| 默认参数 | def f(x: int, scale: float = 1.0) |
| 多输出 | final_vars=["f1","f2","f3"] |