Apache Iceberg 完全指南¶
为什么要学 Apache Iceberg¶
最开放的表格式标准:Apache Iceberg 被 AWS、Google、Apple、Netflix、Snowflake 等公司广泛采用,是数据湖表格式的事实标准。它不绑定任何特定引擎或供应商,Spark、Flink、Trino、Presto、Hive、Dremio 都原生支持。
分区演进——无痛修改分区策略:传统 Hive 表改分区意味着重写所有数据。Iceberg 的分区演进(Partition Evolution)可以在不重写数据的情况下改变分区方式,这在大规模数据湖中是变革性的。
隐式分区——告别分区列管理:Iceberg 的隐式分区让你在建表时定义分区转换(如按月分区),查询时不需要写分区列过滤条件,引擎自动利用分区进行优化。
快照隔离与并发控制:基于快照的读取提供一致性视图,读写互不阻塞。多个写入者通过乐观并发控制安全地同时写入。
PB 级数据的可靠管理:Iceberg 被设计用来管理百 PB 级数据。其元数据结构(manifest 文件 + manifest list)让查询计划在毫秒级完成,即使表有百万个文件。
核心概念详解¶
Iceberg 是什么(白话解释)¶
传统数据湖就是"一堆文件放在 S3/HDFS 上"。你想查数据得知道文件在哪、格式是什么、怎么分区的。换了查询引擎就得重新配置。
Iceberg 就像一个"表目录":它在文件之上建立了一层元数据,记录着"这个表有哪些文件、每个文件有什么数据、数据的统计信息(最大值最小值)"。任何引擎只要读懂这个目录,就能高效查询数据。
元数据架构¶
┌────────────────────────────────────────────┐
│ Catalog(目录) │
│ 记录表的当前 metadata 文件位置 │
└────────────────┬───────────────────────────┘
↓
┌────────────────────────────────────────────┐
│ Metadata File (metadata/v3.metadata.json) │
│ - 表 Schema │
│ - 分区规则 │
│ - 当前快照 ID │
│ - 快照列表 │
└────────────────┬───────────────────────────┘
↓
┌────────────────────────────────────────────┐
│ Manifest List (snap-xxx-xxx.avro) │
│ - 列出所有 Manifest 文件 │
│ - 每个 manifest 的分区范围统计 │
└────────────────┬───────────────────────────┘
↓
┌────────────────────────────────────────────┐
│ Manifest File (xxx-m0.avro) │
│ - 列出数据文件路径 │
│ - 每个文件的列统计(min/max/null_count) │
│ - 文件大小、行数 │
└────────────────┬───────────────────────────┘
↓
┌────────────────────────────────────────────┐
│ Data Files (data/*.parquet) │
│ - 实际数据(Parquet/ORC/Avro) │
└────────────────────────────────────────────┘
隐式分区(Hidden Partitioning)¶
| Hive 分区方式 | Iceberg 隐式分区 |
|---|---|
| 需要额外分区列 | 自动从数据列计算 |
WHERE year=2024 AND month=12 | WHERE event_time > '2024-12-01' |
| 改分区需重写数据 | 分区演进,无需重写 |
| 用户必须知道分区策略 | 透明,引擎自动优化 |
-- Iceberg 建表时定义分区转换
CREATE TABLE events (
event_id BIGINT,
event_time TIMESTAMP,
user_id BIGINT,
event_type STRING,
payload STRING
) USING iceberg
PARTITIONED BY (
days(event_time), -- 按天分区(从 event_time 自动计算)
bucket(16, user_id) -- 按 user_id 哈希分桶
);
-- 查询时不需要写分区条件,Iceberg 自动利用分区
SELECT * FROM events WHERE event_time > '2024-12-01';
-- Iceberg 自动只扫描 12 月的分区
分区转换函数¶
| 函数 | 说明 | 示例 |
|---|---|---|
year(ts) | 按年 | PARTITIONED BY (year(event_time)) |
month(ts) | 按月 | PARTITIONED BY (month(event_time)) |
day(ts) | 按天 | PARTITIONED BY (day(event_time)) |
hour(ts) | 按小时 | PARTITIONED BY (hour(event_time)) |
bucket(n, col) | 哈希分桶 | PARTITIONED BY (bucket(16, user_id)) |
truncate(n, col) | 截断 | PARTITIONED BY (truncate(10, city)) |
identity(col) | 原值 | PARTITIONED BY (identity(country)) |
安装与配置¶
PySpark + Iceberg¶
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("IcebergDemo")
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hadoop")
.config("spark.sql.catalog.local.warehouse", "/tmp/iceberg-warehouse")
.getOrCreate())
PyIceberg(独立 Python 库)¶
from pyiceberg.catalog import load_catalog
# 本地文件系统 catalog
catalog = load_catalog("local", **{
"type": "sql",
"uri": "sqlite:///iceberg_catalog.db",
"warehouse": "/tmp/iceberg-warehouse",
})
# AWS Glue catalog
catalog = load_catalog("glue", **{
"type": "glue",
"s3.region": "us-east-1",
})
# REST catalog
catalog = load_catalog("rest", **{
"type": "rest",
"uri": "http://localhost:8181",
})
Trino + Iceberg¶
# etc/catalog/iceberg.properties
connector.name=iceberg
iceberg.catalog.type=hive_metastore
hive.metastore.uri=thrift://localhost:9083
Flink + Iceberg¶
CREATE CATALOG iceberg_catalog WITH (
'type' = 'iceberg',
'catalog-type' = 'hadoop',
'warehouse' = 's3://my-bucket/warehouse'
);
快速上手:5 分钟最小示例¶
使用 PyIceberg + DuckDB¶
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, LongType, TimestampType
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform
import pyarrow as pa
# 1. 创建 catalog
catalog = SqlCatalog("demo", uri="sqlite:///demo_catalog.db", warehouse="/tmp/iceberg-demo")
# 2. 创建 namespace
catalog.create_namespace("analytics")
# 3. 定义 schema
schema = Schema(
NestedField(1, "event_id", LongType(), required=True),
NestedField(2, "user_id", LongType(), required=True),
NestedField(3, "event_type", StringType()),
NestedField(4, "event_time", TimestampType()),
)
# 4. 定义分区
partition_spec = PartitionSpec(
PartitionField(source_id=4, field_id=1000, transform=DayTransform(), name="event_day")
)
# 5. 创建表
table = catalog.create_table(
identifier="analytics.events",
schema=schema,
partition_spec=partition_spec,
)
# 6. 写入数据
import pyarrow as pa
from datetime import datetime
data = pa.table({
"event_id": [1, 2, 3, 4, 5],
"user_id": [101, 102, 101, 103, 102],
"event_type": ["click", "view", "purchase", "click", "view"],
"event_time": [
datetime(2024, 12, 1, 10, 0),
datetime(2024, 12, 1, 11, 0),
datetime(2024, 12, 2, 9, 0),
datetime(2024, 12, 2, 14, 0),
datetime(2024, 12, 3, 8, 0),
],
})
table.append(data)
# 7. 查询数据
scan = table.scan()
df = scan.to_pandas()
print(df)
# 8. 带过滤条件的扫描
from pyiceberg.expressions import GreaterThanOrEqual
scan_filtered = table.scan(
row_filter=GreaterThanOrEqual("event_id", 3)
)
print(scan_filtered.to_pandas())
# 9. 查看快照
for snapshot in table.history():
print(f"Snapshot: {snapshot.snapshot_id}, Time: {snapshot.timestamp_ms}")
进阶用法¶
场景一:分区演进¶
-- 初始分区:按月
CREATE TABLE events (...) USING iceberg
PARTITIONED BY (month(event_time));
-- 数据量增长后,改为按天分区(无需重写历史数据!)
ALTER TABLE events
ADD PARTITION FIELD day(event_time);
-- 删除旧的按月分区规则
ALTER TABLE events
DROP PARTITION FIELD month(event_time);
-- 历史数据仍按月分区,新数据按天分区
-- Iceberg 会同时利用两种分区进行查询优化
场景二:Schema 演进¶
-- 添加列
ALTER TABLE events ADD COLUMNS (
browser STRING COMMENT '浏览器类型',
country STRING COMMENT '国家'
);
-- 重命名列
ALTER TABLE events RENAME COLUMN browser TO user_agent;
-- 删除列
ALTER TABLE events DROP COLUMN country;
-- 修改列类型(允许的升级:int→bigint, float→double)
ALTER TABLE events ALTER COLUMN user_id TYPE bigint;
-- 重排列顺序
ALTER TABLE events ALTER COLUMN user_agent AFTER event_type;
场景三:快照隔离与时间旅行¶
-- 查看快照历史
SELECT * FROM local.analytics.events.history;
-- 按快照 ID 查询
SELECT * FROM local.analytics.events VERSION AS OF 1234567890;
-- 按时间戳查询
SELECT * FROM local.analytics.events TIMESTAMP AS OF '2024-12-01 00:00:00';
-- 回滚到指定快照
CALL local.system.rollback_to_snapshot('analytics.events', 1234567890);
-- Cherry-pick 快照
CALL local.system.cherrypick_snapshot('analytics.events', 1234567891);
场景四:MERGE(Upsert / SCD Type 2)¶
-- 标准 Upsert
MERGE INTO events AS target
USING updates AS source
ON target.event_id = source.event_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- SCD Type 2(缓慢变化维度)
MERGE INTO dim_customer AS target
USING staged_updates AS source
ON target.customer_id = source.customer_id AND target.is_current = true
WHEN MATCHED AND target.address != source.address THEN
UPDATE SET is_current = false, end_date = current_date()
WHEN NOT MATCHED THEN
INSERT (customer_id, name, address, is_current, start_date, end_date)
VALUES (source.customer_id, source.name, source.address, true, current_date(), null);
场景五:表维护操作¶
-- 合并小文件(compaction)
CALL local.system.rewrite_data_files(
table => 'analytics.events',
strategy => 'sort',
sort_order => 'event_time ASC NULLS LAST'
);
-- 清理过期快照
CALL local.system.expire_snapshots(
table => 'analytics.events',
older_than => TIMESTAMP '2024-11-01 00:00:00',
retain_last => 5
);
-- 删除孤立文件
CALL local.system.remove_orphan_files(
table => 'analytics.events',
older_than => TIMESTAMP '2024-11-01 00:00:00'
);
-- 重写 manifest 文件
CALL local.system.rewrite_manifests('analytics.events');
场景六:Flink CDC 流式写入¶
-- Flink SQL:从 MySQL CDC 流式写入 Iceberg
CREATE TABLE mysql_source (
id BIGINT,
name STRING,
updated_at TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'database-name' = 'mydb',
'table-name' = 'users',
'username' = 'root',
'password' = 'pass'
);
-- 流式 upsert 到 Iceberg
INSERT INTO iceberg_catalog.db.users
SELECT * FROM mysql_source;
场景七:多引擎访问同一表¶
# 同一个 Iceberg 表可以被不同引擎读写
# Spark 写入
spark.sql("INSERT INTO local.analytics.events VALUES (...)")
# Trino 查询
# SELECT * FROM iceberg.analytics.events WHERE ...
# Flink 流式追加
# INSERT INTO iceberg_catalog.analytics.events SELECT ...
# DuckDB 分析
# SELECT * FROM iceberg_scan('s3://bucket/warehouse/analytics/events')
# Python (PyIceberg) 读取
from pyiceberg.catalog import load_catalog
catalog = load_catalog("local")
table = catalog.load_table("analytics.events")
df = table.scan().to_pandas()
场景八:Iceberg REST Catalog¶
# 使用 REST Catalog(推荐的多引擎共享方式)
from pyiceberg.catalog import load_catalog
catalog = load_catalog("rest", **{
"type": "rest",
"uri": "http://iceberg-rest-catalog:8181",
"s3.endpoint": "http://minio:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
})
# 所有引擎连接同一个 REST Catalog
# 共享元数据,保证一致性
常见问题与排错¶
问题一:查询计划耗时长¶
原因:manifest 文件过多(每次小写入都创建新 manifest)。
解决:
问题二:小文件过多¶
-- 合并小文件
CALL system.rewrite_data_files(
table => 'db.table',
options => map('target-file-size-bytes', '134217728') -- 128MB
);
问题三:存储空间持续增长¶
-- 清理过期快照
CALL system.expire_snapshots('db.table', TIMESTAMP '2024-01-01');
-- 删除孤立文件
CALL system.remove_orphan_files('db.table');
问题四:Catalog 选择¶
| Catalog 类型 | 适用场景 |
|---|---|
| Hadoop | 本地开发/简单场景 |
| Hive Metastore | 已有 Hive 基础设施 |
| AWS Glue | AWS 生态 |
| REST Catalog | 多引擎共享(推荐) |
| Nessie | Git-like 数据版本管理 |
| Polaris | Snowflake 开源 catalog |
问题五:与 Delta Lake 互操作¶
-- Iceberg 支持读取 Delta Lake 表(通过 UniForm)
-- Delta Lake 3.x 可以生成 Iceberg 兼容的元数据
-- 或使用工具迁移
CALL system.snapshot('delta_table_path', 'iceberg_table');
参考资源¶
- 官方文档:https://iceberg.apache.org/docs/latest/
- GitHub:https://github.com/apache/iceberg
- PyIceberg:https://py.iceberg.apache.org/
- Iceberg Spec:https://iceberg.apache.org/spec/
- Iceberg Blog:https://iceberg.apache.org/blogs/
- Tabular(Iceberg 商业公司):https://tabular.io/
- Slack 社区:https://iceberg.apache.org/community/