跳转至

Apache Iceberg 完全指南

为什么要学 Apache Iceberg

  1. 最开放的表格式标准:Apache Iceberg 被 AWS、Google、Apple、Netflix、Snowflake 等公司广泛采用,是数据湖表格式的事实标准。它不绑定任何特定引擎或供应商,Spark、Flink、Trino、Presto、Hive、Dremio 都原生支持。

  2. 分区演进——无痛修改分区策略:传统 Hive 表改分区意味着重写所有数据。Iceberg 的分区演进(Partition Evolution)可以在不重写数据的情况下改变分区方式,这在大规模数据湖中是变革性的。

  3. 隐式分区——告别分区列管理:Iceberg 的隐式分区让你在建表时定义分区转换(如按月分区),查询时不需要写分区列过滤条件,引擎自动利用分区进行优化。

  4. 快照隔离与并发控制:基于快照的读取提供一致性视图,读写互不阻塞。多个写入者通过乐观并发控制安全地同时写入。

  5. PB 级数据的可靠管理:Iceberg 被设计用来管理百 PB 级数据。其元数据结构(manifest 文件 + manifest list)让查询计划在毫秒级完成,即使表有百万个文件。


核心概念详解

Iceberg 是什么(白话解释)

传统数据湖就是"一堆文件放在 S3/HDFS 上"。你想查数据得知道文件在哪、格式是什么、怎么分区的。换了查询引擎就得重新配置。

Iceberg 就像一个"表目录":它在文件之上建立了一层元数据,记录着"这个表有哪些文件、每个文件有什么数据、数据的统计信息(最大值最小值)"。任何引擎只要读懂这个目录,就能高效查询数据。

元数据架构

┌────────────────────────────────────────────┐
│ Catalog(目录)                              │
│ 记录表的当前 metadata 文件位置                │
└────────────────┬───────────────────────────┘
┌────────────────────────────────────────────┐
│ Metadata File (metadata/v3.metadata.json)   │
│ - 表 Schema                                 │
│ - 分区规则                                   │
│ - 当前快照 ID                                │
│ - 快照列表                                   │
└────────────────┬───────────────────────────┘
┌────────────────────────────────────────────┐
│ Manifest List (snap-xxx-xxx.avro)           │
│ - 列出所有 Manifest 文件                     │
│ - 每个 manifest 的分区范围统计                │
└────────────────┬───────────────────────────┘
┌────────────────────────────────────────────┐
│ Manifest File (xxx-m0.avro)                 │
│ - 列出数据文件路径                           │
│ - 每个文件的列统计(min/max/null_count)     │
│ - 文件大小、行数                             │
└────────────────┬───────────────────────────┘
┌────────────────────────────────────────────┐
│ Data Files (data/*.parquet)                  │
│ - 实际数据(Parquet/ORC/Avro)              │
└────────────────────────────────────────────┘

隐式分区(Hidden Partitioning)

Hive 分区方式Iceberg 隐式分区
需要额外分区列自动从数据列计算
WHERE year=2024 AND month=12WHERE event_time > '2024-12-01'
改分区需重写数据分区演进,无需重写
用户必须知道分区策略透明,引擎自动优化
-- Iceberg 建表时定义分区转换
CREATE TABLE events (
    event_id BIGINT,
    event_time TIMESTAMP,
    user_id BIGINT,
    event_type STRING,
    payload STRING
) USING iceberg
PARTITIONED BY (
    days(event_time),      -- 按天分区(从 event_time 自动计算)
    bucket(16, user_id)    -- 按 user_id 哈希分桶
);

-- 查询时不需要写分区条件,Iceberg 自动利用分区
SELECT * FROM events WHERE event_time > '2024-12-01';
-- Iceberg 自动只扫描 12 月的分区

分区转换函数

函数说明示例
year(ts)按年PARTITIONED BY (year(event_time))
month(ts)按月PARTITIONED BY (month(event_time))
day(ts)按天PARTITIONED BY (day(event_time))
hour(ts)按小时PARTITIONED BY (hour(event_time))
bucket(n, col)哈希分桶PARTITIONED BY (bucket(16, user_id))
truncate(n, col)截断PARTITIONED BY (truncate(10, city))
identity(col)原值PARTITIONED BY (identity(country))

安装与配置

PySpark + Iceberg

pip install pyspark
from pyspark.sql import SparkSession

spark = (SparkSession.builder
    .appName("IcebergDemo")
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.local.type", "hadoop")
    .config("spark.sql.catalog.local.warehouse", "/tmp/iceberg-warehouse")
    .getOrCreate())

PyIceberg(独立 Python 库)

pip install "pyiceberg[duckdb,pandas,s3fs]"
from pyiceberg.catalog import load_catalog

# 本地文件系统 catalog
catalog = load_catalog("local", **{
    "type": "sql",
    "uri": "sqlite:///iceberg_catalog.db",
    "warehouse": "/tmp/iceberg-warehouse",
})

# AWS Glue catalog
catalog = load_catalog("glue", **{
    "type": "glue",
    "s3.region": "us-east-1",
})

# REST catalog
catalog = load_catalog("rest", **{
    "type": "rest",
    "uri": "http://localhost:8181",
})

Trino + Iceberg

# etc/catalog/iceberg.properties
connector.name=iceberg
iceberg.catalog.type=hive_metastore
hive.metastore.uri=thrift://localhost:9083
CREATE CATALOG iceberg_catalog WITH (
    'type' = 'iceberg',
    'catalog-type' = 'hadoop',
    'warehouse' = 's3://my-bucket/warehouse'
);

快速上手:5 分钟最小示例

使用 PyIceberg + DuckDB

from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, LongType, TimestampType
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform
import pyarrow as pa

# 1. 创建 catalog
catalog = SqlCatalog("demo", uri="sqlite:///demo_catalog.db", warehouse="/tmp/iceberg-demo")

# 2. 创建 namespace
catalog.create_namespace("analytics")

# 3. 定义 schema
schema = Schema(
    NestedField(1, "event_id", LongType(), required=True),
    NestedField(2, "user_id", LongType(), required=True),
    NestedField(3, "event_type", StringType()),
    NestedField(4, "event_time", TimestampType()),
)

# 4. 定义分区
partition_spec = PartitionSpec(
    PartitionField(source_id=4, field_id=1000, transform=DayTransform(), name="event_day")
)

# 5. 创建表
table = catalog.create_table(
    identifier="analytics.events",
    schema=schema,
    partition_spec=partition_spec,
)

# 6. 写入数据
import pyarrow as pa
from datetime import datetime

data = pa.table({
    "event_id": [1, 2, 3, 4, 5],
    "user_id": [101, 102, 101, 103, 102],
    "event_type": ["click", "view", "purchase", "click", "view"],
    "event_time": [
        datetime(2024, 12, 1, 10, 0),
        datetime(2024, 12, 1, 11, 0),
        datetime(2024, 12, 2, 9, 0),
        datetime(2024, 12, 2, 14, 0),
        datetime(2024, 12, 3, 8, 0),
    ],
})

table.append(data)

# 7. 查询数据
scan = table.scan()
df = scan.to_pandas()
print(df)

# 8. 带过滤条件的扫描
from pyiceberg.expressions import GreaterThanOrEqual
scan_filtered = table.scan(
    row_filter=GreaterThanOrEqual("event_id", 3)
)
print(scan_filtered.to_pandas())

# 9. 查看快照
for snapshot in table.history():
    print(f"Snapshot: {snapshot.snapshot_id}, Time: {snapshot.timestamp_ms}")

进阶用法

场景一:分区演进

-- 初始分区:按月
CREATE TABLE events (...) USING iceberg
PARTITIONED BY (month(event_time));

-- 数据量增长后,改为按天分区(无需重写历史数据!)
ALTER TABLE events
ADD PARTITION FIELD day(event_time);

-- 删除旧的按月分区规则
ALTER TABLE events
DROP PARTITION FIELD month(event_time);

-- 历史数据仍按月分区,新数据按天分区
-- Iceberg 会同时利用两种分区进行查询优化

场景二:Schema 演进

-- 添加列
ALTER TABLE events ADD COLUMNS (
    browser STRING COMMENT '浏览器类型',
    country STRING COMMENT '国家'
);

-- 重命名列
ALTER TABLE events RENAME COLUMN browser TO user_agent;

-- 删除列
ALTER TABLE events DROP COLUMN country;

-- 修改列类型(允许的升级:int→bigint, float→double)
ALTER TABLE events ALTER COLUMN user_id TYPE bigint;

-- 重排列顺序
ALTER TABLE events ALTER COLUMN user_agent AFTER event_type;

场景三:快照隔离与时间旅行

-- 查看快照历史
SELECT * FROM local.analytics.events.history;

-- 按快照 ID 查询
SELECT * FROM local.analytics.events VERSION AS OF 1234567890;

-- 按时间戳查询
SELECT * FROM local.analytics.events TIMESTAMP AS OF '2024-12-01 00:00:00';

-- 回滚到指定快照
CALL local.system.rollback_to_snapshot('analytics.events', 1234567890);

-- Cherry-pick 快照
CALL local.system.cherrypick_snapshot('analytics.events', 1234567891);

场景四:MERGE(Upsert / SCD Type 2)

-- 标准 Upsert
MERGE INTO events AS target
USING updates AS source
ON target.event_id = source.event_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

-- SCD Type 2(缓慢变化维度)
MERGE INTO dim_customer AS target
USING staged_updates AS source
ON target.customer_id = source.customer_id AND target.is_current = true
WHEN MATCHED AND target.address != source.address THEN
    UPDATE SET is_current = false, end_date = current_date()
WHEN NOT MATCHED THEN
    INSERT (customer_id, name, address, is_current, start_date, end_date)
    VALUES (source.customer_id, source.name, source.address, true, current_date(), null);

场景五:表维护操作

-- 合并小文件(compaction)
CALL local.system.rewrite_data_files(
    table => 'analytics.events',
    strategy => 'sort',
    sort_order => 'event_time ASC NULLS LAST'
);

-- 清理过期快照
CALL local.system.expire_snapshots(
    table => 'analytics.events',
    older_than => TIMESTAMP '2024-11-01 00:00:00',
    retain_last => 5
);

-- 删除孤立文件
CALL local.system.remove_orphan_files(
    table => 'analytics.events',
    older_than => TIMESTAMP '2024-11-01 00:00:00'
);

-- 重写 manifest 文件
CALL local.system.rewrite_manifests('analytics.events');
-- Flink SQL:从 MySQL CDC 流式写入 Iceberg
CREATE TABLE mysql_source (
    id BIGINT,
    name STRING,
    updated_at TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'database-name' = 'mydb',
    'table-name' = 'users',
    'username' = 'root',
    'password' = 'pass'
);

-- 流式 upsert 到 Iceberg
INSERT INTO iceberg_catalog.db.users
SELECT * FROM mysql_source;

场景七:多引擎访问同一表

# 同一个 Iceberg 表可以被不同引擎读写

# Spark 写入
spark.sql("INSERT INTO local.analytics.events VALUES (...)")

# Trino 查询
# SELECT * FROM iceberg.analytics.events WHERE ...

# Flink 流式追加
# INSERT INTO iceberg_catalog.analytics.events SELECT ...

# DuckDB 分析
# SELECT * FROM iceberg_scan('s3://bucket/warehouse/analytics/events')

# Python (PyIceberg) 读取
from pyiceberg.catalog import load_catalog
catalog = load_catalog("local")
table = catalog.load_table("analytics.events")
df = table.scan().to_pandas()

场景八:Iceberg REST Catalog

# 使用 REST Catalog(推荐的多引擎共享方式)
from pyiceberg.catalog import load_catalog

catalog = load_catalog("rest", **{
    "type": "rest",
    "uri": "http://iceberg-rest-catalog:8181",
    "s3.endpoint": "http://minio:9000",
    "s3.access-key-id": "admin",
    "s3.secret-access-key": "password",
})

# 所有引擎连接同一个 REST Catalog
# 共享元数据,保证一致性

常见问题与排错

问题一:查询计划耗时长

原因:manifest 文件过多(每次小写入都创建新 manifest)。

解决

CALL system.rewrite_manifests('table');

问题二:小文件过多

-- 合并小文件
CALL system.rewrite_data_files(
    table => 'db.table',
    options => map('target-file-size-bytes', '134217728')  -- 128MB
);

问题三:存储空间持续增长

-- 清理过期快照
CALL system.expire_snapshots('db.table', TIMESTAMP '2024-01-01');

-- 删除孤立文件
CALL system.remove_orphan_files('db.table');

问题四:Catalog 选择

Catalog 类型适用场景
Hadoop本地开发/简单场景
Hive Metastore已有 Hive 基础设施
AWS GlueAWS 生态
REST Catalog多引擎共享(推荐)
NessieGit-like 数据版本管理
PolarisSnowflake 开源 catalog

问题五:与 Delta Lake 互操作

-- Iceberg 支持读取 Delta Lake 表(通过 UniForm)
-- Delta Lake 3.x 可以生成 Iceberg 兼容的元数据

-- 或使用工具迁移
CALL system.snapshot('delta_table_path', 'iceberg_table');

参考资源

  • 官方文档:https://iceberg.apache.org/docs/latest/
  • GitHub:https://github.com/apache/iceberg
  • PyIceberg:https://py.iceberg.apache.org/
  • Iceberg Spec:https://iceberg.apache.org/spec/
  • Iceberg Blog:https://iceberg.apache.org/blogs/
  • Tabular(Iceberg 商业公司):https://tabular.io/
  • Slack 社区:https://iceberg.apache.org/community/