Delta Lake 完全指南¶
为什么要学 Delta Lake¶
给数据湖带来 ACID 事务:传统数据湖(Parquet/CSV 文件在 S3/HDFS 上)没有事务保障——并发写入可能损坏数据,失败的作业留下脏数据,读取可能看到不一致状态。Delta Lake 在数据湖之上加了一个事务日志层,让数据湖拥有数据仓库级别的可靠性。
时间旅行(Time Travel):Delta Lake 保留数据的历史版本。你可以查询"昨天的数据是什么样",回滚误操作,做审计追踪。这对数据管道调试和合规性要求至关重要。
Schema 演进与强制:当数据结构变化时(新增列、修改类型),Delta Lake 可以安全地演进 schema,同时防止意外的 schema 破坏。不会因为上游数据格式变了就导致整个管道崩溃。
Lakehouse 架构的基石:Delta Lake 是 Databricks 提出的 Lakehouse 架构的核心。它让你在同一个数据存储上同时运行 BI 查询和 ML 训练,不需要把数据复制到数据仓库。
开源且生态广泛:Delta Lake 是 Linux Foundation 下的开源项目,支持 PySpark、Spark、Rust (delta-rs)、Polars、pandas 等多种引擎。不绑定 Databricks 平台。
核心概念详解¶
Delta Lake 是什么(白话解释)¶
想象你有一堆 Excel 文件存在一个共享文件夹里。多人同时修改可能导致冲突,删错了文件就没了,也不知道谁在什么时候改了什么。
Delta Lake 就像给这些文件加了一个"智能管家": - 记录每次修改的日志(谁改了什么、什么时候改的) - 保证不会有两个人同时写坏数据 - 随时可以回到之前的任何版本 - 自动检查数据格式是否正确
在技术上,Delta Lake 就是在 Parquet 文件的基础上加了一个 _delta_log/ 目录,里面存放 JSON 格式的事务日志。
Delta Lake 事务日志¶
my_table/
├── _delta_log/ # 事务日志
│ ├── 00000000000000000000.json # 第0次提交
│ ├── 00000000000000000001.json # 第1次提交
│ ├── 00000000000000000002.json # 第2次提交
│ └── 00000000000000000010.checkpoint.parquet # 检查点
├── part-00000-xxx.parquet # 数据文件
├── part-00001-xxx.parquet
└── part-00002-xxx.parquet
每个 JSON 日志记录了: - 添加了哪些文件(add) - 删除了哪些文件(remove) - 元数据变更(metadata) - 事务标识
Delta Lake vs Apache Iceberg vs Apache Hudi 对比¶
| 特性 | Delta Lake | Apache Iceberg | Apache Hudi |
|---|---|---|---|
| 创始方 | Databricks | Netflix → Apache | Uber → Apache |
| 协议 | Delta Log (JSON) | Manifest 文件 | Timeline |
| ACID 事务 | 支持 | 支持 | 支持 |
| 时间旅行 | 支持 | 支持 | 支持(有限) |
| Schema 演进 | 支持 | 支持(更灵活) | 支持 |
| 分区演进 | 需重写 | 原生支持 | 部分支持 |
| MERGE/UPSERT | 原生支持 | 支持 | 核心特性 |
| 流处理 | Spark Structured Streaming | Flink/Spark | 核心特性 |
| 小文件合并 | OPTIMIZE + Z-ORDER | 自动 compaction | compaction |
| Spark 集成 | 最深 | 好 | 好 |
| 非Spark引擎 | delta-rs, Polars, DuckDB | Trino, Flink, Presto | 有限 |
| 云存储支持 | S3/ADLS/GCS | S3/ADLS/GCS | S3/ADLS/GCS |
| 开放标准 | Delta UniForm 兼容 | 最开放 | 相对封闭 |
| 社区规模 | 大(Databricks背书) | 快速增长 | 中等 |
核心特性详解¶
| 特性 | 说明 |
|---|---|
| ACID 事务 | 每次写入都是原子操作,要么完全成功,要么完全回滚 |
| 乐观并发 | 多个写入者可以并发操作,冲突时自动重试或报错 |
| 时间旅行 | 通过版本号或时间戳查询历史数据 |
| Schema 强制 | 写入时检查 schema 是否匹配,防止数据污染 |
| Schema 演进 | 安全地添加、删除、重命名列 |
| MERGE (Upsert) | 基于条件合并数据:匹配则更新,不匹配则插入 |
| Z-Ordering | 按指定列重新组织数据布局,优化查询性能 |
| Liquid Clustering | 自动数据聚类,替代手动 Z-Order |
| Change Data Feed | 追踪数据变更(insert/update/delete) |
| Deletion Vectors | 标记删除而非物理删除,提升更新性能 |
安装与配置¶
PySpark + Delta Lake¶
from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession
# 配置 Spark + Delta Lake
builder = (
SparkSession.builder
.appName("DeltaLakeDemo")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()
delta-rs(独立 Python 包,无需 Spark)¶
from deltalake import DeltaTable, write_deltalake
import pandas as pd
# 不需要 Spark!
df = pd.DataFrame({"id": [1, 2, 3], "name": ["a", "b", "c"]})
write_deltalake("my_table", df)
dt = DeltaTable("my_table")
print(dt.to_pandas())
Polars + Delta Lake¶
import polars as pl
# 读取 Delta 表
df = pl.read_delta("my_table")
# 写入 Delta 表
df.write_delta("output_table", mode="overwrite")
DuckDB + Delta Lake¶
快速上手:5 分钟最小示例¶
使用 delta-rs(无需 Spark)¶
from deltalake import DeltaTable, write_deltalake
import pandas as pd
# 1. 创建 Delta 表
df = pd.DataFrame({
"id": [1, 2, 3, 4, 5],
"name": ["张三", "李四", "王五", "赵六", "陈七"],
"amount": [100.5, 200.3, 150.0, 300.8, 250.1],
"date": pd.to_datetime(["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04", "2024-01-05"]),
})
write_deltalake("sales_table", df)
print("表已创建!")
# 2. 读取数据
dt = DeltaTable("sales_table")
print(dt.to_pandas())
# 3. 追加数据
new_data = pd.DataFrame({
"id": [6, 7],
"name": ["周八", "吴九"],
"amount": [180.0, 220.5],
"date": pd.to_datetime(["2024-01-06", "2024-01-07"]),
})
write_deltalake("sales_table", new_data, mode="append")
# 4. 查看历史版本
print("版本历史:")
for version in dt.history():
print(f" 版本 {version['version']}: {version['timestamp']}")
# 5. 时间旅行:读取旧版本
dt_v0 = DeltaTable("sales_table", version=0)
print(f"版本0有 {len(dt_v0.to_pandas())} 行")
dt_latest = DeltaTable("sales_table")
print(f"最新版本有 {len(dt_latest.to_pandas())} 行")
进阶用法¶
场景一:MERGE (Upsert) 操作¶
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = ... # 配置好的 SparkSession
# 目标表
target = DeltaTable.forPath(spark, "sales_table")
# 源数据(可能包含新记录和更新记录)
source = spark.createDataFrame([
(1, "张三", 150.0), # 更新:金额变了
(8, "郑十", 300.0), # 新记录
], ["id", "name", "amount"])
# MERGE 操作
(target.alias("target")
.merge(
source.alias("source"),
"target.id = source.id" # 匹配条件
)
.whenMatchedUpdate(set={
"name": "source.name",
"amount": "source.amount",
})
.whenNotMatchedInsert(values={
"id": "source.id",
"name": "source.name",
"amount": "source.amount",
})
.execute()
)
场景二:时间旅行¶
from delta.tables import DeltaTable
# 按版本号查询
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("sales_table")
# 按时间戳查询
df_yesterday = (spark.read.format("delta")
.option("timestampAsOf", "2024-12-01T00:00:00")
.load("sales_table"))
# 回滚到指定版本
dt = DeltaTable.forPath(spark, "sales_table")
dt.restoreToVersion(2)
# 回滚到指定时间
dt.restoreToTimestamp("2024-12-01T00:00:00")
场景三:Schema 演进¶
# 添加新列(自动 schema 演进)
new_data = spark.createDataFrame([
(9, "新用户", 500.0, "VIP"), # 多了一个 tier 列
], ["id", "name", "amount", "tier"])
(new_data.write
.format("delta")
.mode("append")
.option("mergeSchema", "true") # 允许 schema 演进
.save("sales_table"))
# 查看当前 schema
dt = DeltaTable.forPath(spark, "sales_table")
dt.toDF().printSchema()
场景四:Z-Ordering 优化查询性能¶
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "sales_table")
# Z-ORDER:重新组织数据布局,让常查询的列物理上聚集在一起
dt.optimize().executeZOrderBy("date", "region")
# OPTIMIZE:合并小文件
dt.optimize().executeCompaction()
-- SQL 方式
OPTIMIZE sales_table ZORDER BY (date, region);
OPTIMIZE sales_table WHERE date >= '2024-01-01';
场景五:Change Data Feed(变更数据捕获)¶
# 启用 CDF
spark.sql("""
ALTER TABLE sales_table
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# 读取变更数据
changes = (spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 5)
.option("endingVersion", 10)
.table("sales_table"))
# 变更类型:insert, update_preimage, update_postimage, delete
changes.show()
场景六:流式读写¶
# 流式读取 Delta 表
stream_df = (spark.readStream
.format("delta")
.load("sales_table"))
# 流式写入 Delta 表
query = (stream_df
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/tmp/checkpoint")
.start("output_table"))
# 流表合一:同一个 Delta 表同时支持批处理和流处理
场景七:数据清理与维护¶
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "sales_table")
# 清理旧版本文件(保留最近7天)
dt.vacuum(retentionHours=168)
# 查看表详细信息
dt.detail().show()
# 查看表历史
dt.history().show()
# 删除满足条件的数据
dt.delete("date < '2023-01-01'")
# 更新数据
dt.update(
condition="region = '华东'",
set={"discount": "0.1"}
)
场景八:云存储集成(S3)¶
# delta-rs 读写 S3
from deltalake import DeltaTable, write_deltalake
import pandas as pd
storage_options = {
"AWS_ACCESS_KEY_ID": "xxx",
"AWS_SECRET_ACCESS_KEY": "yyy",
"AWS_REGION": "us-east-1",
}
# 写入 S3
df = pd.DataFrame({"col1": [1, 2], "col2": ["a", "b"]})
write_deltalake(
"s3://my-bucket/my-delta-table",
df,
storage_options=storage_options,
)
# 读取 S3
dt = DeltaTable("s3://my-bucket/my-delta-table", storage_options=storage_options)
print(dt.to_pandas())
常见问题与排错¶
问题一:并发写入冲突¶
症状:ConcurrentModificationException
解决:Delta Lake 使用乐观并发控制。对于追加操作通常不会冲突,但对于 MERGE/UPDATE/DELETE 可能冲突。
问题二:小文件问题¶
# 定期执行 OPTIMIZE 合并小文件
dt.optimize().executeCompaction()
# 自动优化(Databricks 上)
spark.sql("ALTER TABLE t SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)")
spark.sql("ALTER TABLE t SET TBLPROPERTIES (delta.autoOptimize.autoCompact = true)")
问题三:VACUUM 后时间旅行失败¶
VACUUM 会删除旧版本的数据文件。如果你 vacuum 了 7 天前的文件,就不能再查询 7 天前的版本。
问题四:Schema 不匹配¶
# 方案一:启用 schema 演进
df.write.format("delta").mode("append").option("mergeSchema", "true").save(path)
# 方案二:覆盖 schema
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(path)
问题五:delta-rs vs PySpark Delta 的区别¶
| 特性 | delta-rs (deltalake) | PySpark Delta |
|---|---|---|
| 依赖 | 无需 Spark/JVM | 需要 Spark |
| 安装 | pip install deltalake | 需配置 Spark |
| 性能 | Rust 原生 | JVM 开销 |
| 功能 | 读写/MERGE/VACUUM | 完整功能 |
| 流处理 | 不支持 | 支持 |
| 集群 | 单机 | 分布式 |
参考资源¶
- 官方文档:https://delta.io/
- GitHub:https://github.com/delta-io/delta
- delta-rs (Rust/Python):https://github.com/delta-io/delta-rs
- Delta Lake 快速入门:https://docs.delta.io/latest/quick-start.html
- Delta UniForm:https://docs.delta.io/latest/delta-uniform.html
- Databricks Delta Lake 文档:https://docs.databricks.com/delta/index.html
- Delta Lake Blog:https://delta.io/blog
- Slack 社区:https://go.delta.io/slack