Dask 并行计算¶
为什么要学 Dask¶
Dask 是一个灵活的 Python 并行计算库,可以将 NumPy、Pandas 和 scikit-learn 的工作负载扩展到多核或集群环境。它提供了与这些库高度兼容的 API,同时支持自定义任务图来编排复杂的并行工作流。对于数据集超过内存容量或需要加速计算密集型任务的 Python 项目来说,Dask 是最主流的扩展方案。
核心概念¶
| 概念 | 白话解释 | 用途 |
|---|---|---|
| Dask DataFrame | 分布式数据框 | 兼容 Pandas 的大数据处理 |
| Dask Array | 分布式数组 | 兼容 NumPy 的大数组计算 |
| Dask Bag | 分布式集合 | 处理非结构化数据 |
| Task Graph | 任务图 | 延迟计算的依赖关系图 |
| Scheduler | 调度器 | 执行任务的引擎(单机/分布式) |
| Delayed | 延迟修饰器 | 将普通函数转为延迟计算 |
安装配置¶
快速上手¶
Dask DataFrame¶
import dask.dataframe as dd
# 读取大文件(自动分块)
df = dd.read_csv("large_*.csv") # 支持通配符
df = dd.read_parquet("data/") # 读取 Parquet 目录
# 操作(延迟执行)
result = df.groupby("category").agg({"price": "mean"})
# 触发计算
computed = result.compute() # 返回 Pandas DataFrame
# 写入
df.to_parquet("output/", write_index=False)
Dask Delayed¶
from dask import delayed
import dask
@delayed
def load(path):
return pd.read_csv(path)
@delayed
def process(df):
return df[df["score"] > 80].groupby("name").mean()
@delayed
def combine(results):
return pd.concat(results)
# 构建任务图
files = ["data1.csv", "data2.csv", "data3.csv"]
loaded = [load(f) for f in files]
processed = [process(df) for df in loaded]
final = combine(processed)
# 执行
result = final.compute()
分布式调度器¶
from dask.distributed import Client
# 本地集群
client = Client(n_workers=4, threads_per_worker=2, memory_limit="4GB")
print(client.dashboard_link) # 监控面板 http://localhost:8787
# 使用分布式客户端后,所有 Dask 操作自动并行
df = dd.read_csv("data.csv")
result = df.groupby("key").sum().compute()
进阶用法¶
与机器学习集成¶
import dask_ml.model_selection as dcv
from sklearn.ensemble import RandomForestClassifier
# 分布式交叉验证
model = RandomForestClassifier()
param_grid = {"n_estimators": [100, 200, 500], "max_depth": [5, 10, 20]}
search = dcv.GridSearchCV(model, param_grid, cv=5)
search.fit(X, y)
自定义任务图¶
内存管理¶
# 设置 Worker 内存限制
client = Client(memory_limit="8GB")
# 监控内存使用
client.run(lambda: psutil.virtual_memory().percent)
常见问题¶
Q: Dask vs Spark?¶
- Dask:Python 原生、与 Pandas/NumPy 兼容、适合中等规模
- Spark:JVM 生态、更成熟的集群管理、适合超大规模
Q: 所有 Pandas 操作都支持吗?¶
大部分常用操作支持。不支持的操作可以用 map_partitions 在每个分区上执行 Pandas 代码。
Q: 如何选择分区数?¶
通常设置为 CPU 核心数的 2-4 倍,每个分区 50-200MB 数据。
参考资源¶
- 官网:https://dask.org/
- GitHub:https://github.com/dask/dask
- 文档:https://docs.dask.org/
- Dashboard:https://docs.dask.org/en/latest/dashboard.html
- 教程:https://tutorial.dask.org/