跳转至

Ibis 统一数据框 API 完全指南

为什么要学 Ibis

  1. 一套代码跑遍所有引擎:Ibis 提供统一的 Python DataFrame API,同一段代码可以在 DuckDB、PostgreSQL、Spark、BigQuery、Snowflake、Polars 等 20+ 后端上运行。写一次逻辑,换个后端参数就能从笔记本 DuckDB 无缝切换到生产 BigQuery。

  2. 延迟执行(Lazy Evaluation):Ibis 表达式不会立即执行,而是构建一个逻辑计划。这使得后端引擎可以做全局优化——谓词下推、列裁剪、连接重排序等。你写的"Pandas 风格"代码会被翻译成高效的 SQL 或执行计划。

  3. Pandas 开发者零门槛上手:如果你会 Pandas,你就会 Ibis。API 设计风格相似,但解决了 Pandas 的扩展性问题。不再有内存不足的烦恼,因为计算在数据库/引擎中进行,不需要把数据全部加载到 Python 内存。

  4. SQL 和 DataFrame 自由切换:Ibis 可以将 DataFrame 操作翻译成 SQL,你可以随时查看生成的 SQL。反过来,你也可以直接写 SQL 然后继续用 Ibis API 处理结果。两种范式无缝混用。

  5. 现代数据栈的粘合剂:作为 Voltron Data 的开源项目(Apache Arrow 的幕后公司),Ibis 与 Apache Arrow、Substrait 深度集成,是现代数据分析工具链的核心枢纽。


核心概念详解

Ibis 是什么(白话解释)

想象你在不同餐厅吃饭:中餐厅用中文点菜,日本料理用日语,西餐用英语。很麻烦对吧?

Ibis 就像一个"万能翻译"。你用一种语言(Ibis Python API)描述你想要什么数据操作,Ibis 自动翻译成目标引擎的语言(SQL、Spark API 等)。换了家"餐厅"(数据库/引擎),你的"点菜方式"(代码)不需要改。

核心架构

Python 代码(Ibis API)
Ibis 表达式树(逻辑计划)
后端编译器(翻译为目标引擎语言)
┌─────────┬──────────┬──────────┬──────────┐
│ DuckDB  │ Postgres │ Spark    │ BigQuery │ ...
│ (SQL)   │ (SQL)    │ (Spark)  │ (SQL)    │
└─────────┴──────────┴──────────┴──────────┘
执行结果 → Arrow Table / Pandas / Polars

支持的后端

后端类型适用场景
DuckDB嵌入式本地分析、开发测试
Polars嵌入式高性能本地分析
DataFusion嵌入式Rust 引擎
PostgreSQL关系型OLTP + 分析
MySQL关系型Web 应用数据
SQLite嵌入式轻量级
Spark分布式大数据集
Trino分布式联邦查询
BigQuery云数据仓库GCP 大数据
Snowflake云数据仓库企业数据仓库
ClickHouseOLAP实时分析
Impala分布式Hadoop 生态
Oracle关系型企业级
MS SQL Server关系型Windows 生态

Ibis vs Pandas vs Polars 对比

特性IbisPandasPolars
执行模式延迟执行立即执行延迟+立即
数据位置在引擎中内存中内存中
最大数据量取决于后端(TB级)受限于RAM受限于RAM
后端切换20+ 后端仅Pandas仅Polars
SQL 生成支持不支持不支持
类型系统强类型弱(object列)强类型
NULL 处理统一(引擎处理)NaN/None混乱统一None
API风格类Pandas+链式混合链式表达式
并行执行取决于后端单线程多线程
适合角色数据工程师/分析师数据分析师性能敏感分析

安装与配置

基本安装

# 安装 Ibis 核心 + DuckDB 后端(推荐起步)
pip install 'ibis-framework[duckdb]'

# 安装多个后端
pip install 'ibis-framework[duckdb,postgres,bigquery]'

# 所有后端
pip install 'ibis-framework[all]'

# 特定后端
pip install 'ibis-framework[snowflake]'
pip install 'ibis-framework[spark]'
pip install 'ibis-framework[clickhouse]'
pip install 'ibis-framework[polars]'

# 可视化支持
pip install 'ibis-framework[visualization]'

连接后端

import ibis

# DuckDB(默认后端,内存模式)
con = ibis.duckdb.connect()

# DuckDB(文件模式)
con = ibis.duckdb.connect("my_data.ddb")

# PostgreSQL
con = ibis.postgres.connect(
    host="localhost",
    port=5432,
    database="mydb",
    user="user",
    password="pass",
)

# BigQuery
con = ibis.bigquery.connect(
    project_id="my-project",
    dataset_id="my_dataset",
)

# Snowflake
con = ibis.snowflake.connect(
    account="xxx.us-east-1",
    user="user",
    password="pass",
    database="DB",
    schema="PUBLIC",
    warehouse="COMPUTE_WH",
)

# Polars
con = ibis.polars.connect()

# 也可以用统一的 connect 函数
con = ibis.connect("duckdb://")
con = ibis.connect("postgres://user:pass@localhost/db")

交互模式设置

# 启用交互模式(在 Jupyter 中自动显示结果)
ibis.options.interactive = True

# 设置默认后端
ibis.set_backend("duckdb")

快速上手:5 分钟最小示例

import ibis

# 启用交互模式
ibis.options.interactive = True

# 连接 DuckDB(内存模式)
con = ibis.duckdb.connect()

# 从 CSV 读取数据
t = con.read_csv("sales.csv")
# 或直接用内存数据
t = ibis.memtable({
    "product": ["苹果", "香蕉", "橙子", "苹果", "香蕉", "橙子"],
    "region": ["东", "东", "东", "西", "西", "西"],
    "sales": [100, 80, 120, 90, 110, 95],
    "date": ["2024-01", "2024-01", "2024-01", "2024-01", "2024-01", "2024-01"],
})

# 基本查询
print(t)

# 筛选
high_sales = t.filter(t.sales > 90)
print(high_sales)

# 分组聚合
summary = (
    t
    .group_by("product")
    .agg(
        total_sales=t.sales.sum(),
        avg_sales=t.sales.mean(),
        count=t.count(),
    )
    .order_by(ibis.desc("total_sales"))
)
print(summary)

# 查看生成的 SQL
print(ibis.to_sql(summary))

# 转换为 Pandas
df = summary.to_pandas()

# 转换为 Polars
pl_df = summary.to_polars()

# 转换为 PyArrow
arrow_table = summary.to_pyarrow()

进阶用法

场景一:复杂数据转换管道

import ibis
from ibis import _

con = ibis.duckdb.connect()
orders = con.read_parquet("orders.parquet")
customers = con.read_parquet("customers.parquet")

# 链式操作:筛选 → 连接 → 计算 → 聚合 → 排序
result = (
    orders
    .filter(_.order_date >= "2024-01-01")
    .filter(_.status == "completed")
    .join(customers, orders.customer_id == customers.id)
    .mutate(
        order_month=_.order_date.truncate("M"),
        revenue=_.quantity * _.unit_price,
        discount_amount=ibis.ifelse(_.discount > 0, _.quantity * _.unit_price * _.discount, 0),
    )
    .group_by(["order_month", "customer_segment"])
    .agg(
        total_revenue=_.revenue.sum(),
        total_discount=_.discount_amount.sum(),
        order_count=_.count(),
        unique_customers=_.customer_id.nunique(),
        avg_order_value=_.revenue.mean(),
    )
    .mutate(
        net_revenue=_.total_revenue - _.total_discount,
        revenue_per_customer=_.total_revenue / _.unique_customers,
    )
    .order_by(["order_month", ibis.desc("total_revenue")])
)

# 查看 SQL
print(ibis.to_sql(result))

# 执行
result.to_pandas()

场景二:窗口函数

import ibis
from ibis import _

# 排名
ranked = (
    sales_table
    .mutate(
        rank=ibis.rank().over(
            ibis.window(group_by="category", order_by=ibis.desc("amount"))
        ),
        running_total=_.amount.sum().over(
            ibis.window(group_by="category", order_by="date", following=0)
        ),
        pct_of_category=_.amount / _.amount.sum().over(
            ibis.window(group_by="category")
        ),
        moving_avg=_.amount.mean().over(
            ibis.window(order_by="date", preceding=6, following=0)
        ),
    )
)

场景三:后端无缝切换

import ibis

def analyze_sales(con):
    """同一段分析代码,可以在任何后端运行"""
    t = con.table("sales")

    return (
        t
        .filter(t.year == 2024)
        .group_by("product_category")
        .agg(
            revenue=t.amount.sum(),
            orders=t.count(),
            avg_order=t.amount.mean(),
        )
        .order_by(ibis.desc("revenue"))
    )

# 开发环境:DuckDB
dev_con = ibis.duckdb.connect("dev.ddb")
dev_result = analyze_sales(dev_con)

# 生产环境:BigQuery
prod_con = ibis.bigquery.connect(project_id="prod-project", dataset_id="analytics")
prod_result = analyze_sales(prod_con)

# 同样的代码,不同的后端!

场景四:UDF(用户定义函数)

import ibis
from ibis import udf

# 标量 UDF
@udf.scalar.builtin
def levenshtein(a: str, b: str) -> int:
    """使用后端内置的 Levenshtein 距离函数"""
    ...

# Python UDF
@udf.scalar.python
def classify_amount(amount: float) -> str:
    if amount > 1000:
        return "high"
    elif amount > 100:
        return "medium"
    else:
        return "low"

# 使用 UDF
result = t.mutate(
    amount_class=classify_amount(t.amount),
    name_distance=levenshtein(t.name, "target"),
)

场景五:与 ML 管道集成

import ibis
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier

con = ibis.duckdb.connect()
raw_data = con.read_parquet("features.parquet")

# 用 Ibis 做特征工程(在数据库引擎中执行)
features = (
    raw_data
    .mutate(
        log_amount=_.amount.log(),
        days_since_signup=(ibis.now() - _.signup_date).cast("int32"),
        is_weekend=_.order_date.day_of_week.name().isin(["Saturday", "Sunday"]),
    )
    .drop("raw_column_1", "raw_column_2")  # 删除不需要的列
    .dropna(subset=["target"])  # 删除缺失标签
)

# 转换为 Pandas 后送入 sklearn
df = features.to_pandas()
X = df.drop("target", axis=1).select_dtypes(include=["number"])
y = df["target"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = RandomForestClassifier()
model.fit(X_train, y_train)
print(f"Accuracy: {model.score(X_test, y_test):.3f}")

场景六:混合 SQL 与 Ibis API

import ibis

con = ibis.duckdb.connect()

# 先用 SQL 做初步查询
raw = con.sql("""
    SELECT
        customer_id,
        DATE_TRUNC('month', order_date) as month,
        SUM(amount) as monthly_spend
    FROM orders
    WHERE order_date >= '2024-01-01'
    GROUP BY 1, 2
""")

# 然后用 Ibis API 继续处理
result = (
    raw
    .mutate(
        spend_rank=ibis.rank().over(
            ibis.window(group_by="customer_id", order_by="monthly_spend")
        )
    )
    .filter(_.spend_rank <= 3)  # 每个客户消费最高的3个月
)

# 查看最终 SQL
print(ibis.to_sql(result))

场景七:处理地理空间数据

import ibis

con = ibis.duckdb.connect()
con.raw_sql("INSTALL spatial; LOAD spatial;")

locations = con.read_parquet("locations.parquet")

# 地理空间操作
result = (
    locations
    .mutate(
        point=ibis.literal("POINT(116.4 39.9)").cast("geometry"),
    )
    # DuckDB spatial 扩展支持
)

场景八:数据质量检查

import ibis

def data_quality_report(table):
    """通用数据质量报告"""
    cols = table.columns

    checks = []
    for col in cols:
        col_expr = table[col]
        checks.append({
            "column": col,
            "null_count": col_expr.isnull().sum().execute(),
            "null_pct": (col_expr.isnull().sum() / table.count() * 100).execute(),
            "distinct_count": col_expr.nunique().execute(),
        })

    # 数值列统计
    numeric_cols = [c for c in cols if table[c].type().is_numeric()]
    for col in numeric_cols:
        for check in checks:
            if check["column"] == col:
                check["min"] = table[col].min().execute()
                check["max"] = table[col].max().execute()
                check["mean"] = table[col].mean().execute()

    return checks

# 使用
con = ibis.duckdb.connect()
t = con.read_csv("data.csv")
report = data_quality_report(t)
for item in report:
    print(item)

常见问题与排错

问题一:表达式不执行/没有输出

原因:Ibis 默认是延迟执行的。

# 不会执行:
result = t.filter(t.x > 10)  # 只构建了表达式

# 执行方式一:调用 .execute() 或 .to_pandas()
df = result.to_pandas()

# 执行方式二:启用交互模式
ibis.options.interactive = True
result  # 在 Jupyter 中会自动显示

问题二:后端不支持某个操作

# 某些操作不是所有后端都支持
# 例如 regexp 在 SQLite 上可能不可用

# 检查后端支持的操作
print(con.name)  # 当前后端名称

# 解决方案:用 SQL 直接写
result = con.sql("SELECT REGEXP_MATCHES(col, 'pattern') FROM table")

问题三:数据类型不匹配

# Ibis 有自己的类型系统
import ibis.expr.datatypes as dt

# 显式类型转换
t = t.mutate(
    amount=t.amount.cast("float64"),
    date=t.date_str.cast("date"),
    flag=t.flag.cast("boolean"),
)

# 查看列类型
print(t.schema())

问题四:如何查看生成的 SQL

# 方法一:ibis.to_sql()
print(ibis.to_sql(expression))

# 方法二:.compile()
print(expression.compile())

# 对于调试非常有用

问题五:内存不足

# Ibis 的优势就是数据不需要全部在内存中
# 确保使用合适的后端

# DuckDB 可以处理大于内存的数据(spill to disk)
con = ibis.duckdb.connect("my.ddb")

# 避免过早 .to_pandas(),尽量在 Ibis 中完成计算
# 不好:
df = huge_table.to_pandas()  # 可能 OOM
result = df.groupby("col").sum()

# 好:
result = huge_table.group_by("col").agg(total=_.value.sum())
small_df = result.to_pandas()  # 聚合后的小结果再转 Pandas

问题六:Deferred 表达式(_ 语法)

from ibis import _

# _ 是 ibis.deferred 的简写,代表"当前表"
# 在链式操作中特别有用

result = (
    t
    .filter(_.amount > 100)      # _ 代表 t
    .mutate(doubled=_.amount * 2) # _ 仍然代表当前管道中的表
    .select(_.product, _.doubled)
)

# 等价于:
result = (
    t
    .filter(t.amount > 100)
    .mutate(doubled=t.amount * 2)
    .select("product", "doubled")
)

参考资源

  • 官方文档:https://ibis-project.org/
  • GitHub:https://github.com/ibis-project/ibis
  • 教程:https://ibis-project.org/tutorials/
  • 后端支持列表:https://ibis-project.org/backends/
  • API 参考:https://ibis-project.org/reference/
  • Ibis Blog:https://ibis-project.org/blog/
  • Voltron Data(Ibis 母公司):https://voltrondata.com/
  • Zulip 社区:https://ibis-project.zulipchat.com/