跳转至

petl — 轻量级 Python ETL 数据抽取转换加载工具


一句话说明

petl 用链式操作处理表格数据,像搭积木一样拼接 CSV/数据库读取、过滤、转换、写出,内存占用极小(流式处理),适合小团队快速做 ETL 管道。


安装与配置

# pip 安装
pip install petl                 # 当前版本 1.7+

# 数据库支持(可选)
pip install petl SQLAlchemy      # 连数据库需要

# 验证
python -c "import petl; print(petl.__version__)"

核心用法

读取数据

import petl as etl               # 惯例用 etl 别名

# 读 CSV 文件(流式,不全量加载到内存)
table = etl.fromcsv("data.csv")  # 返回 Table 对象(懒加载)
print(etl.look(table))           # 预览前 5 行

# 读 JSON
table_json = etl.fromjson("data.json")

# 读 SQLite
import sqlite3
conn = sqlite3.connect("db.sqlite")
table_db = etl.fromsqlite3(conn, "SELECT * FROM users")

基本转换操作

# 管道式链式操作(像 Unix 管道)
result = (
    etl.fromcsv("patients.csv")           # 1. 读入数据
    .rename({"PatientID": "id",           # 2. 重命名列
              "Age": "age"})
    .addfield("age_group",                 # 3. 添加新列
        lambda row: "老年" if row["age"] >= 60 else "中青年")
    .select("{age} > 18")                 # 4. 过滤:只保留成年人
    .cutout("备注")                        # 5. 删除不需要的列
    .sort("age")                          # 6. 按年龄排序
)

# 此时还没真正处理,只是构建了管道!
etl.tocsv(result, "output.csv")          # 触发执行并写出

常用操作

# 选列
t2 = etl.cut(table, "id", "name", "age")     # 只保留这几列

# 过滤行
t3 = etl.select(table, "{age} > 18")         # 字符串表达式
t3 = etl.select(table, lambda row: row["age"] > 18)  # lambda 方式

# 添加列
t4 = etl.addfield(table, "bmi",
    lambda row: row["weight"] / (row["height"]/100)**2)

# 合并两个表(类似 SQL JOIN)
t5 = etl.join(table_a, table_b, key="id")    # INNER JOIN

# 追加(纵向合并)
t6 = etl.cat(table_a, table_b)               # UNION ALL

实战案例

CSV 数据清洗流水线

import petl as etl

# 清洗医疗数据的完整管道
pipeline = (
    etl.fromcsv("raw_data.csv", encoding="gbk")       # 读 GBK 编码
    .rename({"病人编号": "pid", "年龄": "age"})        # 重命名
    .convert("age", int)                               # 类型转换为整数
    .select("{age} >= 18 and {age} <= 80")             # 过滤年龄范围
    .addfield("age_group", lambda r:                   # 分年龄段
        "老年" if r["age"] >= 60
        else "中年" if r["age"] >= 40
        else "青年")
    .sort("age")                                       # 排序
)

# 写出到多种格式
etl.tocsv(pipeline, "cleaned.csv")                    # 写 CSV
etl.toxlsx(pipeline, "cleaned.xlsx", "Sheet1")        # 写 Excel
etl.tojson(pipeline, "cleaned.json")                  # 写 JSON

# 直接写入数据库
import sqlite3
conn = sqlite3.connect("output.db")
etl.tosqlite3(pipeline, conn, "patients", create=True)

常见报错与解决

报错原因解决
UnicodeDecodeErrorCSV 编码不对encoding="gbk""utf-8-sig"
FieldSelectionError列名不存在etl.look() 检查列名
petl.errors.DuplicateKeyErrorjoin 时键值重复etl.leftjoin() 或检查主键唯一性
内存溢出调用了 list()petl 是流式的,不要转 list,直接写出

速查表

操作代码
读 CSVetl.fromcsv("f.csv")
预览etl.look(t)
选列etl.cut(t, "col1", "col2")
过滤etl.select(t, "{age}>18")
重命名etl.rename(t, {"old": "new"})
添加列etl.addfield(t, "col", lambda r: ...)
类型转换etl.convert(t, "col", int)
排序etl.sort(t, "col")
JOINetl.join(t1, t2, key="id")
写 CSVetl.tocsv(t, "out.csv")