跳转至

Fugue — 一套代码跑 pandas/Spark/Dask/Ray 的统一数据处理框架


一句话说明

Fugue 让你用普通 Python 或 pandas 代码写业务逻辑,只改一行就能切换到 Spark/Dask/Ray 大规模并行运行,开发时用小数据调试,上线时用集群跑大数据。


安装与配置

# pip 安装基础版
pip install fugue                    # 支持 pandas/DuckDB

# 按需安装后端
pip install fugue[spark]             # Spark 支持
pip install fugue[dask]              # Dask 支持
pip install fugue[ray]               # Ray 支持
pip install fugue[duckdb]            # DuckDB(本地加速)

# 验证
python -c "import fugue; print(fugue.__version__)"

核心用法

用 transform() 并行化函数

import pandas as pd
from fugue import transform

# 1. 先写普通的 pandas 函数(可以本地调试)
def normalize_counts(df: pd.DataFrame) -> pd.DataFrame:
    """对每个样本的计数做 TPM 归一化"""
    df = df.copy()
    total = df["count"].sum()               # 计算总计数
    df["tpm"] = df["count"] / total * 1e6  # 转为 TPM
    return df

# 2. 用 transform() 在不同后端并行执行
result = transform(
    df       = pd.read_csv("counts.csv"),
    using    = normalize_counts,
    schema   = "sample_id:str, gene_id:str, count:int, tpm:float",
    partition = {"by": "sample_id"},        # 按样本并行
    engine   = None,                        # None = pandas(本地调试)
)

# 3. 上线时只改 engine 参数!
# engine = "dask"     # 切换 Dask
# engine = spark      # 切换 Spark(传 SparkSession 对象)

FugueSQL — 用 SQL 描述流程

from fugue.api import fugue_sql

# 用 SQL + Python 混写的 Fugue SQL
query = """
-- 读取数据
SELECT * FROM otu_table WHERE n_reads > 1000

-- 计算分组统计
SELECT group_id,
       AVG(shannon) AS mean_shannon,
       COUNT(*) AS n_samples
FROM otu_table
GROUP BY group_id
ORDER BY mean_shannon DESC
"""

result = fugue_sql(
    query,
    otu_table = pd.read_csv("otu.csv"),    # 传入变量
    engine    = "duckdb",                  # 用 DuckDB 加速
)

完整工作流

from fugue import FugueWorkflow

# 定义变换函数
def filter_low_quality(df: pd.DataFrame) -> pd.DataFrame:
    """去除低质量样本"""
    return df[df["n_reads"] >= 10000].copy()

def calculate_diversity(df: pd.DataFrame) -> pd.DataFrame:
    """计算多样性指数(伪代码示意)"""
    df = df.copy()
    df["richness"] = (df > 0).sum(axis=1)    # 物种丰富度
    return df

# 用 with 块构建工作流
with FugueWorkflow() as dag:
    raw = dag.df(pd.read_csv("otu.csv"))     # 加载数据
    filtered  = raw.transform(filter_low_quality, "*")    # 质量过滤
    diversity = filtered.transform(calculate_diversity, "*,richness:int")
    diversity.show(5)                                       # 预览

实战案例

批量处理多个样本文件

import pandas as pd
from fugue import transform

def process_sample(df: pd.DataFrame) -> pd.DataFrame:
    """处理单个样本(普通 pandas 函数)"""
    df = df.copy()
    df["relative"] = df["count"] / df["count"].sum()  # 相对丰度
    return df

# 一次处理所有样本(按 sample_id 分区并行)
all_data = pd.concat([
    pd.read_csv(f"sample_{i}.csv").assign(sample_id=f"S{i:03d}")
    for i in range(1, 101)
])

result = transform(
    df       = all_data,
    using    = process_sample,
    schema   = "*, relative:float",
    partition = {"by": "sample_id", "num": 10},  # 分 10 个分区
    engine   = "dask",                            # Dask 并行
)

常见报错与解决

报错原因解决
SchemaErrorschema 参数写错检查列名和类型,用 * 保留输入列
ModuleNotFoundError: pysparkSpark 引擎未安装pip install fugue[spark]
PartitionError分区键列不存在确保 by 中的列在 df 中存在

速查表

操作代码
并行变换transform(df, func, schema, partition, engine)
切换 pandasengine=None
切换 DuckDBengine="duckdb"
切换 Daskengine="dask"
FugueSQLfugue_sql(query, **vars)
按列分区partition={"by": "col"}
保留所有列schema="*"
保留并加列schema="*, new_col:float"