Fugue — 一套代码跑 pandas/Spark/Dask/Ray 的统一数据处理框架
一句话说明
Fugue 让你用普通 Python 或 pandas 代码写业务逻辑,只改一行就能切换到 Spark/Dask/Ray 大规模并行运行,开发时用小数据调试,上线时用集群跑大数据。
安装与配置
# pip 安装基础版
pip install fugue # 支持 pandas/DuckDB
# 按需安装后端
pip install fugue[spark] # Spark 支持
pip install fugue[dask] # Dask 支持
pip install fugue[ray] # Ray 支持
pip install fugue[duckdb] # DuckDB(本地加速)
# 验证
python -c "import fugue; print(fugue.__version__)"
核心用法
import pandas as pd
from fugue import transform
# 1. 先写普通的 pandas 函数(可以本地调试)
def normalize_counts(df: pd.DataFrame) -> pd.DataFrame:
"""对每个样本的计数做 TPM 归一化"""
df = df.copy()
total = df["count"].sum() # 计算总计数
df["tpm"] = df["count"] / total * 1e6 # 转为 TPM
return df
# 2. 用 transform() 在不同后端并行执行
result = transform(
df = pd.read_csv("counts.csv"),
using = normalize_counts,
schema = "sample_id:str, gene_id:str, count:int, tpm:float",
partition = {"by": "sample_id"}, # 按样本并行
engine = None, # None = pandas(本地调试)
)
# 3. 上线时只改 engine 参数!
# engine = "dask" # 切换 Dask
# engine = spark # 切换 Spark(传 SparkSession 对象)
FugueSQL — 用 SQL 描述流程
from fugue.api import fugue_sql
# 用 SQL + Python 混写的 Fugue SQL
query = """
-- 读取数据
SELECT * FROM otu_table WHERE n_reads > 1000
-- 计算分组统计
SELECT group_id,
AVG(shannon) AS mean_shannon,
COUNT(*) AS n_samples
FROM otu_table
GROUP BY group_id
ORDER BY mean_shannon DESC
"""
result = fugue_sql(
query,
otu_table = pd.read_csv("otu.csv"), # 传入变量
engine = "duckdb", # 用 DuckDB 加速
)
完整工作流
from fugue import FugueWorkflow
# 定义变换函数
def filter_low_quality(df: pd.DataFrame) -> pd.DataFrame:
"""去除低质量样本"""
return df[df["n_reads"] >= 10000].copy()
def calculate_diversity(df: pd.DataFrame) -> pd.DataFrame:
"""计算多样性指数(伪代码示意)"""
df = df.copy()
df["richness"] = (df > 0).sum(axis=1) # 物种丰富度
return df
# 用 with 块构建工作流
with FugueWorkflow() as dag:
raw = dag.df(pd.read_csv("otu.csv")) # 加载数据
filtered = raw.transform(filter_low_quality, "*") # 质量过滤
diversity = filtered.transform(calculate_diversity, "*,richness:int")
diversity.show(5) # 预览
实战案例
批量处理多个样本文件
import pandas as pd
from fugue import transform
def process_sample(df: pd.DataFrame) -> pd.DataFrame:
"""处理单个样本(普通 pandas 函数)"""
df = df.copy()
df["relative"] = df["count"] / df["count"].sum() # 相对丰度
return df
# 一次处理所有样本(按 sample_id 分区并行)
all_data = pd.concat([
pd.read_csv(f"sample_{i}.csv").assign(sample_id=f"S{i:03d}")
for i in range(1, 101)
])
result = transform(
df = all_data,
using = process_sample,
schema = "*, relative:float",
partition = {"by": "sample_id", "num": 10}, # 分 10 个分区
engine = "dask", # Dask 并行
)
常见报错与解决
| 报错 | 原因 | 解决 |
|---|
SchemaError | schema 参数写错 | 检查列名和类型,用 * 保留输入列 |
ModuleNotFoundError: pyspark | Spark 引擎未安装 | pip install fugue[spark] |
PartitionError | 分区键列不存在 | 确保 by 中的列在 df 中存在 |
速查表
| 操作 | 代码 |
|---|
| 并行变换 | transform(df, func, schema, partition, engine) |
| 切换 pandas | engine=None |
| 切换 DuckDB | engine="duckdb" |
| 切换 Dask | engine="dask" |
| FugueSQL | fugue_sql(query, **vars) |
| 按列分区 | partition={"by": "col"} |
| 保留所有列 | schema="*" |
| 保留并加列 | schema="*, new_col:float" |