420_ORM框架使用与优化¶
一句话说明¶
ORM(对象关系映射)让你用Python类操作数据库表,不用写SQL,但要小心N+1查询等性能陷阱。
核心知识点¶
ORM优缺点¶
| 方面 | 优点 | 缺点 |
|---|---|---|
| 开发效率 | 不用写SQL,开发快 | 复杂查询不直观 |
| 安全性 | 自动防SQL注入 | 可能生成低效SQL |
| 可维护性 | 代码即数据库文档 | 学习成本 |
| 性能 | 连接池内置 | N+1查询性能杀手 |
Python主流ORM¶
| ORM | 特点 | 适用场景 |
|---|---|---|
| SQLAlchemy | 功能最强,灵活 | 复杂项目 |
| Django ORM | 与Django绑定,简单 | Django项目 |
| Peewee | 轻量简单 | 小项目 |
| Tortoise-ORM | 异步支持 | FastAPI异步项目 |
实战代码¶
# ========== SQLAlchemy 2.0 ORM实战 ==========
# pip install sqlalchemy psycopg2-binary
from sqlalchemy import create_engine, Column, String, Integer, Float, Date, ForeignKey
from sqlalchemy.orm import DeclarativeBase, relationship, Session, selectinload, joinedload
from sqlalchemy import select, func
from datetime import date
# ========== 1. 定义数据模型(ORM的核心) ==========
class Base(DeclarativeBase):
"""所有模型类的基类"""
pass
class Patient(Base):
"""患者表"""
__tablename__ = "patients" # 对应的数据库表名
patient_id = Column(Integer, primary_key=True, autoincrement=True)
patient_name = Column(String(100), nullable=False)
age = Column(Integer)
# 关系定义(一对多:一个患者有多个样本)
samples = relationship("Sample", back_populates="patient",
cascade="all, delete-orphan") # 级联删除
def __repr__(self):
return f"<Patient(id={self.patient_id}, name={self.patient_name})>"
class Sample(Base):
"""样本表"""
__tablename__ = "samples"
sample_id = Column(String(20), primary_key=True)
patient_id = Column(Integer, ForeignKey("patients.patient_id"), nullable=False)
tissue_type = Column(String(50))
collection_date = Column(Date, default=date.today)
# 反向关系(多对一:样本属于患者)
patient = relationship("Patient", back_populates="samples")
# 关系(一对多:样本有多条表达数据)
expressions = relationship("GeneExpression", back_populates="sample",
lazy="dynamic") # lazy=dynamic: 按需加载
class GeneExpression(Base):
"""基因表达量表"""
__tablename__ = "gene_expression"
expr_id = Column(Integer, primary_key=True, autoincrement=True)
gene_id = Column(String(20), nullable=False, index=True) # 常查询列加索引
sample_id = Column(String(20), ForeignKey("samples.sample_id"), nullable=False)
expression = Column(Float, nullable=False)
condition = Column(String(50), index=True) # 常过滤列加索引
sample = relationship("Sample", back_populates="expressions")
# ========== 2. 建表和初始化 ==========
engine = create_engine("sqlite:///biodb.sqlite") # 用SQLite演示(无需安装)
Base.metadata.create_all(engine) # 自动创建所有表(如不存在)
print("数据库表创建成功!")
# ========== 3. 增删改查(CRUD) ==========
with Session(engine) as session:
# ===== 创建(Create)=====
patient = Patient(patient_name="张三", age=45)
session.add(patient)
session.flush() # 刷新到数据库(获取自增ID,但不提交)
sample = Sample(sample_id="S001", patient_id=patient.patient_id,
tissue_type="colon_tumor")
session.add(sample)
# 批量插入(更高效)
expressions = [
GeneExpression(gene_id="BRCA1", sample_id="S001", expression=245.7, condition="tumor"),
GeneExpression(gene_id="TP53", sample_id="S001", expression=89.3, condition="tumor"),
GeneExpression(gene_id="KRAS", sample_id="S001", expression=412.1, condition="tumor"),
]
session.add_all(expressions) # 批量添加
session.commit() # 提交事务
print(f"患者已创建,ID: {patient.patient_id}")
# ===== 查询(Read)=====
# 方式1:get by primary key
p = session.get(Patient, 1) # 通过主键查询
print(f"患者: {p}")
# 方式2:where条件查询
stmt = select(GeneExpression).where(
GeneExpression.condition == "tumor",
GeneExpression.expression > 100
).order_by(GeneExpression.expression.desc()) # 降序
results = session.execute(stmt).scalars().all()
for expr in results:
print(f"基因: {expr.gene_id}, 表达量: {expr.expression}")
# 方式3:聚合查询
avg_stmt = select(
GeneExpression.condition,
func.avg(GeneExpression.expression).label("avg_expr"),
func.count(GeneExpression.expr_id).label("count")
).group_by(GeneExpression.condition)
for row in session.execute(avg_stmt):
print(f"条件: {row.condition}, 平均: {row.avg_expr:.2f}, 数量: {row.count}")
# ========== 4. N+1查询问题与解决方案 ==========
with Session(engine) as session:
# ❌ N+1问题:查询N个样本,每个样本又单独查患者信息
# 这会产生 1 + N 个SQL查询!
samples = session.execute(select(Sample)).scalars().all()
for s in samples:
# 每次访问s.patient都会触发一个额外的SQL查询!
print(f"样本 {s.sample_id} 的患者: {s.patient.patient_name}")
# ✅ 解决方案1:使用joinedload(一个JOIN查询搞定)
stmt = select(Sample).options(
joinedload(Sample.patient) # 预先JOIN加载patient
)
samples = session.execute(stmt).unique().scalars().all()
for s in samples:
# 现在访问s.patient不会触发额外查询(已预加载)
print(f"样本 {s.sample_id} 的患者: {s.patient.patient_name}")
# ✅ 解决方案2:selectinload(适合一对多关系)
stmt = select(Patient).options(
selectinload(Patient.samples) # 用IN子句批量加载samples
)
patients = session.execute(stmt).scalars().all()
for p in patients:
# samples已预加载,不会N+1
print(f"患者 {p.patient_name} 有 {len(p.samples)} 个样本")
# ========== 5. 批量操作优化 ==========
with Session(engine) as session:
# 批量更新(比逐行update快100x)
from sqlalchemy import update
stmt = update(GeneExpression).where(
GeneExpression.condition == "tumor"
).values(
condition="tumor_validated" # 批量修改condition字段
)
result = session.execute(stmt)
print(f"批量更新了 {result.rowcount} 条记录")
session.commit()
# 批量插入(bulk_insert_mappings,比逐条add快10x)
new_data = [
{"gene_id": f"GENE_{i}", "sample_id": "S001",
"expression": i * 10.5, "condition": "normal"}
for i in range(100)
]
session.execute(GeneExpression.__table__.insert(), new_data) # 批量插入
session.commit()
print("批量插入完成")
面试常问点¶
Q: 什么是N+1查询问题?如何解决? A: 查N条记录时,ORM懒加载关联表,产生N个额外查询(总共N+1个)。解决:用
joinedload(JOIN一次性取全部)或selectinload(IN子句批量取)预加载关联数据。Q: ORM的lazy loading和eager loading有什么区别? A: lazy loading(懒加载):访问关联属性时才查数据库(默认行为,可能N+1);eager loading(预加载):查主对象时就同时查关联对象(joinedload/selectinload)。
Q: 什么时候放弃ORM直接写SQL? A: 复杂的聚合/窗口函数、批量数据导入(ETL)、需要特定数据库特性(PostgreSQL的JSONB操作)时,直接写原生SQL更清晰高效。SQLAlchemy也支持
text()执行原生SQL。Q: ORM的session和transaction是什么关系? A: Session是工作单元(Unit of Work),管理对象状态;Transaction是数据库事务。Session内隐式开启事务,commit提交,rollback/close回滚。
速查表¶
# 增
session.add(obj)
session.add_all([obj1, obj2])
session.commit()
# 查
session.get(Model, pk) # 按主键
session.execute(select(Model).where(Model.col == val)).scalars().all()
# 改
obj.field = new_value
session.commit()
# 删
session.delete(obj)
session.commit()
# 预加载(避免N+1)
select(Model).options(joinedload(Model.relation))
select(Model).options(selectinload(Model.relation))