跳转至

Delta Lake 完全指南

为什么要学 Delta Lake

  1. 给数据湖带来 ACID 事务:传统数据湖(Parquet/CSV 文件在 S3/HDFS 上)没有事务保障——并发写入可能损坏数据,失败的作业留下脏数据,读取可能看到不一致状态。Delta Lake 在数据湖之上加了一个事务日志层,让数据湖拥有数据仓库级别的可靠性。

  2. 时间旅行(Time Travel):Delta Lake 保留数据的历史版本。你可以查询"昨天的数据是什么样",回滚误操作,做审计追踪。这对数据管道调试和合规性要求至关重要。

  3. Schema 演进与强制:当数据结构变化时(新增列、修改类型),Delta Lake 可以安全地演进 schema,同时防止意外的 schema 破坏。不会因为上游数据格式变了就导致整个管道崩溃。

  4. Lakehouse 架构的基石:Delta Lake 是 Databricks 提出的 Lakehouse 架构的核心。它让你在同一个数据存储上同时运行 BI 查询和 ML 训练,不需要把数据复制到数据仓库。

  5. 开源且生态广泛:Delta Lake 是 Linux Foundation 下的开源项目,支持 PySpark、Spark、Rust (delta-rs)、Polars、pandas 等多种引擎。不绑定 Databricks 平台。


核心概念详解

Delta Lake 是什么(白话解释)

想象你有一堆 Excel 文件存在一个共享文件夹里。多人同时修改可能导致冲突,删错了文件就没了,也不知道谁在什么时候改了什么。

Delta Lake 就像给这些文件加了一个"智能管家": - 记录每次修改的日志(谁改了什么、什么时候改的) - 保证不会有两个人同时写坏数据 - 随时可以回到之前的任何版本 - 自动检查数据格式是否正确

在技术上,Delta Lake 就是在 Parquet 文件的基础上加了一个 _delta_log/ 目录,里面存放 JSON 格式的事务日志。

Delta Lake 事务日志

my_table/
├── _delta_log/                    # 事务日志
│   ├── 00000000000000000000.json  # 第0次提交
│   ├── 00000000000000000001.json  # 第1次提交
│   ├── 00000000000000000002.json  # 第2次提交
│   └── 00000000000000000010.checkpoint.parquet  # 检查点
├── part-00000-xxx.parquet         # 数据文件
├── part-00001-xxx.parquet
└── part-00002-xxx.parquet

每个 JSON 日志记录了: - 添加了哪些文件(add) - 删除了哪些文件(remove) - 元数据变更(metadata) - 事务标识

Delta Lake vs Apache Iceberg vs Apache Hudi 对比

特性Delta LakeApache IcebergApache Hudi
创始方DatabricksNetflix → ApacheUber → Apache
协议Delta Log (JSON)Manifest 文件Timeline
ACID 事务支持支持支持
时间旅行支持支持支持(有限)
Schema 演进支持支持(更灵活)支持
分区演进需重写原生支持部分支持
MERGE/UPSERT原生支持支持核心特性
流处理Spark Structured StreamingFlink/Spark核心特性
小文件合并OPTIMIZE + Z-ORDER自动 compactioncompaction
Spark 集成最深
非Spark引擎delta-rs, Polars, DuckDBTrino, Flink, Presto有限
云存储支持S3/ADLS/GCSS3/ADLS/GCSS3/ADLS/GCS
开放标准Delta UniForm 兼容最开放相对封闭
社区规模大(Databricks背书)快速增长中等

核心特性详解

特性说明
ACID 事务每次写入都是原子操作,要么完全成功,要么完全回滚
乐观并发多个写入者可以并发操作,冲突时自动重试或报错
时间旅行通过版本号或时间戳查询历史数据
Schema 强制写入时检查 schema 是否匹配,防止数据污染
Schema 演进安全地添加、删除、重命名列
MERGE (Upsert)基于条件合并数据:匹配则更新,不匹配则插入
Z-Ordering按指定列重新组织数据布局,优化查询性能
Liquid Clustering自动数据聚类,替代手动 Z-Order
Change Data Feed追踪数据变更(insert/update/delete)
Deletion Vectors标记删除而非物理删除,提升更新性能

安装与配置

PySpark + Delta Lake

# 安装
pip install pyspark delta-spark

# 或使用 conda
conda install -c conda-forge delta-spark pyspark
from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession

# 配置 Spark + Delta Lake
builder = (
    SparkSession.builder
    .appName("DeltaLakeDemo")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

delta-rs(独立 Python 包,无需 Spark)

pip install deltalake
from deltalake import DeltaTable, write_deltalake
import pandas as pd

# 不需要 Spark!
df = pd.DataFrame({"id": [1, 2, 3], "name": ["a", "b", "c"]})
write_deltalake("my_table", df)

dt = DeltaTable("my_table")
print(dt.to_pandas())

Polars + Delta Lake

pip install polars deltalake
import polars as pl

# 读取 Delta 表
df = pl.read_delta("my_table")

# 写入 Delta 表
df.write_delta("output_table", mode="overwrite")

DuckDB + Delta Lake

-- 安装扩展
INSTALL delta;
LOAD delta;

-- 读取 Delta 表
SELECT * FROM delta_scan('s3://bucket/my_table');

快速上手:5 分钟最小示例

使用 delta-rs(无需 Spark)

from deltalake import DeltaTable, write_deltalake
import pandas as pd

# 1. 创建 Delta 表
df = pd.DataFrame({
    "id": [1, 2, 3, 4, 5],
    "name": ["张三", "李四", "王五", "赵六", "陈七"],
    "amount": [100.5, 200.3, 150.0, 300.8, 250.1],
    "date": pd.to_datetime(["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04", "2024-01-05"]),
})

write_deltalake("sales_table", df)
print("表已创建!")

# 2. 读取数据
dt = DeltaTable("sales_table")
print(dt.to_pandas())

# 3. 追加数据
new_data = pd.DataFrame({
    "id": [6, 7],
    "name": ["周八", "吴九"],
    "amount": [180.0, 220.5],
    "date": pd.to_datetime(["2024-01-06", "2024-01-07"]),
})
write_deltalake("sales_table", new_data, mode="append")

# 4. 查看历史版本
print("版本历史:")
for version in dt.history():
    print(f"  版本 {version['version']}: {version['timestamp']}")

# 5. 时间旅行:读取旧版本
dt_v0 = DeltaTable("sales_table", version=0)
print(f"版本0有 {len(dt_v0.to_pandas())} 行")

dt_latest = DeltaTable("sales_table")
print(f"最新版本有 {len(dt_latest.to_pandas())} 行")

进阶用法

场景一:MERGE (Upsert) 操作

from delta.tables import DeltaTable
from pyspark.sql import SparkSession

spark = ...  # 配置好的 SparkSession

# 目标表
target = DeltaTable.forPath(spark, "sales_table")

# 源数据(可能包含新记录和更新记录)
source = spark.createDataFrame([
    (1, "张三", 150.0),   # 更新:金额变了
    (8, "郑十", 300.0),   # 新记录
], ["id", "name", "amount"])

# MERGE 操作
(target.alias("target")
    .merge(
        source.alias("source"),
        "target.id = source.id"  # 匹配条件
    )
    .whenMatchedUpdate(set={
        "name": "source.name",
        "amount": "source.amount",
    })
    .whenNotMatchedInsert(values={
        "id": "source.id",
        "name": "source.name",
        "amount": "source.amount",
    })
    .execute()
)

场景二:时间旅行

from delta.tables import DeltaTable

# 按版本号查询
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("sales_table")

# 按时间戳查询
df_yesterday = (spark.read.format("delta")
    .option("timestampAsOf", "2024-12-01T00:00:00")
    .load("sales_table"))

# 回滚到指定版本
dt = DeltaTable.forPath(spark, "sales_table")
dt.restoreToVersion(2)

# 回滚到指定时间
dt.restoreToTimestamp("2024-12-01T00:00:00")

场景三:Schema 演进

# 添加新列(自动 schema 演进)
new_data = spark.createDataFrame([
    (9, "新用户", 500.0, "VIP"),  # 多了一个 tier 列
], ["id", "name", "amount", "tier"])

(new_data.write
    .format("delta")
    .mode("append")
    .option("mergeSchema", "true")  # 允许 schema 演进
    .save("sales_table"))

# 查看当前 schema
dt = DeltaTable.forPath(spark, "sales_table")
dt.toDF().printSchema()

场景四:Z-Ordering 优化查询性能

from delta.tables import DeltaTable

dt = DeltaTable.forPath(spark, "sales_table")

# Z-ORDER:重新组织数据布局,让常查询的列物理上聚集在一起
dt.optimize().executeZOrderBy("date", "region")

# OPTIMIZE:合并小文件
dt.optimize().executeCompaction()
-- SQL 方式
OPTIMIZE sales_table ZORDER BY (date, region);
OPTIMIZE sales_table WHERE date >= '2024-01-01';

场景五:Change Data Feed(变更数据捕获)

# 启用 CDF
spark.sql("""
    ALTER TABLE sales_table
    SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# 读取变更数据
changes = (spark.read.format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 5)
    .option("endingVersion", 10)
    .table("sales_table"))

# 变更类型:insert, update_preimage, update_postimage, delete
changes.show()

场景六:流式读写

# 流式读取 Delta 表
stream_df = (spark.readStream
    .format("delta")
    .load("sales_table"))

# 流式写入 Delta 表
query = (stream_df
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/tmp/checkpoint")
    .start("output_table"))

# 流表合一:同一个 Delta 表同时支持批处理和流处理

场景七:数据清理与维护

from delta.tables import DeltaTable

dt = DeltaTable.forPath(spark, "sales_table")

# 清理旧版本文件(保留最近7天)
dt.vacuum(retentionHours=168)

# 查看表详细信息
dt.detail().show()

# 查看表历史
dt.history().show()

# 删除满足条件的数据
dt.delete("date < '2023-01-01'")

# 更新数据
dt.update(
    condition="region = '华东'",
    set={"discount": "0.1"}
)

场景八:云存储集成(S3)

# delta-rs 读写 S3
from deltalake import DeltaTable, write_deltalake
import pandas as pd

storage_options = {
    "AWS_ACCESS_KEY_ID": "xxx",
    "AWS_SECRET_ACCESS_KEY": "yyy",
    "AWS_REGION": "us-east-1",
}

# 写入 S3
df = pd.DataFrame({"col1": [1, 2], "col2": ["a", "b"]})
write_deltalake(
    "s3://my-bucket/my-delta-table",
    df,
    storage_options=storage_options,
)

# 读取 S3
dt = DeltaTable("s3://my-bucket/my-delta-table", storage_options=storage_options)
print(dt.to_pandas())

常见问题与排错

问题一:并发写入冲突

症状ConcurrentModificationException

解决:Delta Lake 使用乐观并发控制。对于追加操作通常不会冲突,但对于 MERGE/UPDATE/DELETE 可能冲突。

# 设置重试次数
spark.conf.set("spark.databricks.delta.retryWriteConflict.limit", "3")

问题二:小文件问题

# 定期执行 OPTIMIZE 合并小文件
dt.optimize().executeCompaction()

# 自动优化(Databricks 上)
spark.sql("ALTER TABLE t SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)")
spark.sql("ALTER TABLE t SET TBLPROPERTIES (delta.autoOptimize.autoCompact = true)")

问题三:VACUUM 后时间旅行失败

Error: Version X not found

VACUUM 会删除旧版本的数据文件。如果你 vacuum 了 7 天前的文件,就不能再查询 7 天前的版本。

# 保留更长的历史
dt.vacuum(retentionHours=720)  # 30天

# 或者不要频繁 vacuum

问题四:Schema 不匹配

AnalysisException: A schema mismatch detected when writing to the Delta table
# 方案一:启用 schema 演进
df.write.format("delta").mode("append").option("mergeSchema", "true").save(path)

# 方案二:覆盖 schema
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(path)

问题五:delta-rs vs PySpark Delta 的区别

特性delta-rs (deltalake)PySpark Delta
依赖无需 Spark/JVM需要 Spark
安装pip install deltalake需配置 Spark
性能Rust 原生JVM 开销
功能读写/MERGE/VACUUM完整功能
流处理不支持支持
集群单机分布式

参考资源

  • 官方文档:https://delta.io/
  • GitHub:https://github.com/delta-io/delta
  • delta-rs (Rust/Python):https://github.com/delta-io/delta-rs
  • Delta Lake 快速入门:https://docs.delta.io/latest/quick-start.html
  • Delta UniForm:https://docs.delta.io/latest/delta-uniform.html
  • Databricks Delta Lake 文档:https://docs.databricks.com/delta/index.html
  • Delta Lake Blog:https://delta.io/blog
  • Slack 社区:https://go.delta.io/slack