跳转至

Dask 并行计算

为什么要学 Dask

Dask 是一个灵活的 Python 并行计算库,可以将 NumPy、Pandas 和 scikit-learn 的工作负载扩展到多核或集群环境。它提供了与这些库高度兼容的 API,同时支持自定义任务图来编排复杂的并行工作流。对于数据集超过内存容量或需要加速计算密集型任务的 Python 项目来说,Dask 是最主流的扩展方案。


核心概念

概念白话解释用途
Dask DataFrame分布式数据框兼容 Pandas 的大数据处理
Dask Array分布式数组兼容 NumPy 的大数组计算
Dask Bag分布式集合处理非结构化数据
Task Graph任务图延迟计算的依赖关系图
Scheduler调度器执行任务的引擎(单机/分布式)
Delayed延迟修饰器将普通函数转为延迟计算

安装配置

pip install "dask[complete]"
# 包含:distributed, dataframe, array, bag, diagnostics

快速上手

Dask DataFrame

import dask.dataframe as dd

# 读取大文件(自动分块)
df = dd.read_csv("large_*.csv")     # 支持通配符
df = dd.read_parquet("data/")       # 读取 Parquet 目录

# 操作(延迟执行)
result = df.groupby("category").agg({"price": "mean"})

# 触发计算
computed = result.compute()  # 返回 Pandas DataFrame

# 写入
df.to_parquet("output/", write_index=False)

Dask Delayed

from dask import delayed
import dask

@delayed
def load(path):
    return pd.read_csv(path)

@delayed
def process(df):
    return df[df["score"] > 80].groupby("name").mean()

@delayed
def combine(results):
    return pd.concat(results)

# 构建任务图
files = ["data1.csv", "data2.csv", "data3.csv"]
loaded = [load(f) for f in files]
processed = [process(df) for df in loaded]
final = combine(processed)

# 执行
result = final.compute()

分布式调度器

from dask.distributed import Client

# 本地集群
client = Client(n_workers=4, threads_per_worker=2, memory_limit="4GB")
print(client.dashboard_link)  # 监控面板 http://localhost:8787

# 使用分布式客户端后,所有 Dask 操作自动并行
df = dd.read_csv("data.csv")
result = df.groupby("key").sum().compute()

进阶用法

与机器学习集成

import dask_ml.model_selection as dcv
from sklearn.ensemble import RandomForestClassifier

# 分布式交叉验证
model = RandomForestClassifier()
param_grid = {"n_estimators": [100, 200, 500], "max_depth": [5, 10, 20]}
search = dcv.GridSearchCV(model, param_grid, cv=5)
search.fit(X, y)

自定义任务图

import dask

# 可视化任务图
final.visualize("task_graph.png")

内存管理

# 设置 Worker 内存限制
client = Client(memory_limit="8GB")

# 监控内存使用
client.run(lambda: psutil.virtual_memory().percent)

常见问题

Q: Dask vs Spark?

  • Dask:Python 原生、与 Pandas/NumPy 兼容、适合中等规模
  • Spark:JVM 生态、更成熟的集群管理、适合超大规模

Q: 所有 Pandas 操作都支持吗?

大部分常用操作支持。不支持的操作可以用 map_partitions 在每个分区上执行 Pandas 代码。

Q: 如何选择分区数?

通常设置为 CPU 核心数的 2-4 倍,每个分区 50-200MB 数据。


参考资源

  • 官网:https://dask.org/
  • GitHub:https://github.com/dask/dask
  • 文档:https://docs.dask.org/
  • Dashboard:https://docs.dask.org/en/latest/dashboard.html
  • 教程:https://tutorial.dask.org/