petl — 轻量级 Python ETL 数据抽取转换加载工具
一句话说明
petl 用链式操作处理表格数据,像搭积木一样拼接 CSV/数据库读取、过滤、转换、写出,内存占用极小(流式处理),适合小团队快速做 ETL 管道。
安装与配置
# pip 安装
pip install petl # 当前版本 1.7+
# 数据库支持(可选)
pip install petl SQLAlchemy # 连数据库需要
# 验证
python -c "import petl; print(petl.__version__)"
核心用法
读取数据
import petl as etl # 惯例用 etl 别名
# 读 CSV 文件(流式,不全量加载到内存)
table = etl.fromcsv("data.csv") # 返回 Table 对象(懒加载)
print(etl.look(table)) # 预览前 5 行
# 读 JSON
table_json = etl.fromjson("data.json")
# 读 SQLite
import sqlite3
conn = sqlite3.connect("db.sqlite")
table_db = etl.fromsqlite3(conn, "SELECT * FROM users")
基本转换操作
# 管道式链式操作(像 Unix 管道)
result = (
etl.fromcsv("patients.csv") # 1. 读入数据
.rename({"PatientID": "id", # 2. 重命名列
"Age": "age"})
.addfield("age_group", # 3. 添加新列
lambda row: "老年" if row["age"] >= 60 else "中青年")
.select("{age} > 18") # 4. 过滤:只保留成年人
.cutout("备注") # 5. 删除不需要的列
.sort("age") # 6. 按年龄排序
)
# 此时还没真正处理,只是构建了管道!
etl.tocsv(result, "output.csv") # 触发执行并写出
常用操作
# 选列
t2 = etl.cut(table, "id", "name", "age") # 只保留这几列
# 过滤行
t3 = etl.select(table, "{age} > 18") # 字符串表达式
t3 = etl.select(table, lambda row: row["age"] > 18) # lambda 方式
# 添加列
t4 = etl.addfield(table, "bmi",
lambda row: row["weight"] / (row["height"]/100)**2)
# 合并两个表(类似 SQL JOIN)
t5 = etl.join(table_a, table_b, key="id") # INNER JOIN
# 追加(纵向合并)
t6 = etl.cat(table_a, table_b) # UNION ALL
实战案例
CSV 数据清洗流水线
import petl as etl
# 清洗医疗数据的完整管道
pipeline = (
etl.fromcsv("raw_data.csv", encoding="gbk") # 读 GBK 编码
.rename({"病人编号": "pid", "年龄": "age"}) # 重命名
.convert("age", int) # 类型转换为整数
.select("{age} >= 18 and {age} <= 80") # 过滤年龄范围
.addfield("age_group", lambda r: # 分年龄段
"老年" if r["age"] >= 60
else "中年" if r["age"] >= 40
else "青年")
.sort("age") # 排序
)
# 写出到多种格式
etl.tocsv(pipeline, "cleaned.csv") # 写 CSV
etl.toxlsx(pipeline, "cleaned.xlsx", "Sheet1") # 写 Excel
etl.tojson(pipeline, "cleaned.json") # 写 JSON
# 直接写入数据库
import sqlite3
conn = sqlite3.connect("output.db")
etl.tosqlite3(pipeline, conn, "patients", create=True)
常见报错与解决
| 报错 | 原因 | 解决 |
|---|
UnicodeDecodeError | CSV 编码不对 | 加 encoding="gbk" 或 "utf-8-sig" |
FieldSelectionError | 列名不存在 | 先 etl.look() 检查列名 |
petl.errors.DuplicateKeyError | join 时键值重复 | 用 etl.leftjoin() 或检查主键唯一性 |
| 内存溢出 | 调用了 list() | petl 是流式的,不要转 list,直接写出 |
速查表
| 操作 | 代码 |
|---|
| 读 CSV | etl.fromcsv("f.csv") |
| 预览 | etl.look(t) |
| 选列 | etl.cut(t, "col1", "col2") |
| 过滤 | etl.select(t, "{age}>18") |
| 重命名 | etl.rename(t, {"old": "new"}) |
| 添加列 | etl.addfield(t, "col", lambda r: ...) |
| 类型转换 | etl.convert(t, "col", int) |
| 排序 | etl.sort(t, "col") |
| JOIN | etl.join(t1, t2, key="id") |
| 写 CSV | etl.tocsv(t, "out.csv") |