419_数据库连接池管理¶
一句话说明¶
连接池就是预先建好一批数据库连接放着,代码需要时直接借用,用完还回去,避免每次都重新建连接的开销。
核心知识点¶
为什么需要连接池¶
- 建连接代价高:TCP握手 + 数据库认证 + 初始化,需要10-50ms
- 高并发问题:100个请求同时来,100个连接同时建→数据库崩溃
- 连接池方案:预建固定数量连接,请求从池里借用,用完归还
关键参数¶
| 参数 | 含义 | 建议值 |
|---|---|---|
| pool_size | 连接池大小(保持的连接数) | CPU核数 × 2 + 磁盘数 |
| max_overflow | 超出pool_size的最大额外连接数 | pool_size的50% |
| pool_timeout | 等待获取连接的超时时间 | 30秒 |
| pool_recycle | 连接回收时间(防止连接过期) | 3600秒 |
| pool_pre_ping | 使用前ping一下(防止使用已断开的连接) | True |
实战代码¶
# ========== Python数据库连接池管理 ==========
# ========== 1. SQLAlchemy连接池(最常用的Python ORM) ==========
from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
# 创建带连接池的数据库引擎
engine = create_engine(
"postgresql://user:password@localhost:5432/biodb",
# 连接池配置
pool_size=10, # 连接池大小(保持10个连接)
max_overflow=20, # 超出时最多再建20个连接(峰值=10+20=30)
pool_timeout=30, # 等待连接超时30秒(超时抛异常而非无限等待)
pool_recycle=3600, # 每小时回收连接(防止MySQL的8小时超时断开)
pool_pre_ping=True, # 使用连接前先ping(自动剔除断开的连接)
echo=False # 不打印SQL日志(生产环境)
)
# 查询池状态
def print_pool_status(engine):
"""打印连接池当前状态"""
pool = engine.pool
print(f"池大小: {pool.size()}") # 当前连接总数
print(f"已检出: {pool.checkedout()}") # 正在使用的连接数
print(f"可用: {pool.checkedin()}") # 空闲连接数
print(f"溢出: {pool.overflow()}") # 超出pool_size的连接数
# 使用连接(with语句自动归还连接)
def query_gene_expression(gene_id: str) -> list:
"""查询基因表达量(连接自动从池中获取和归还)"""
with engine.connect() as conn: # 从池获取连接
result = conn.execute(
text("SELECT sample_id, expression FROM gene_expression WHERE gene_id = :gene_id"),
{"gene_id": gene_id} # 参数化查询(防SQL注入)
)
return result.fetchall()
# with块结束后连接自动归还到池!
# 批量查询(使用同一个连接)
def batch_query(gene_ids: list) -> dict:
"""批量查询(多个查询共用一个连接,减少连接获取次数)"""
results = {}
with engine.connect() as conn: # 获取一次连接
for gene_id in gene_ids:
result = conn.execute(
text("SELECT AVG(expression) as avg_expr FROM gene_expression WHERE gene_id = :gid"),
{"gid": gene_id}
)
row = result.fetchone()
results[gene_id] = row[0] if row else None
return results # 连接最后归还
print_pool_status(engine)
# ========== 2. psycopg2连接池(PostgreSQL原生驱动) ==========
from psycopg2 import pool as pg_pool
import threading
# 创建线程安全的连接池
connection_pool = pg_pool.ThreadedConnectionPool(
minconn=2, # 最少保持2个连接
maxconn=20, # 最多20个连接
host="localhost",
database="biodb",
user="biouser",
password="biopass",
port=5432
)
def get_sample_metadata(sample_id: str) -> dict:
"""使用psycopg2连接池查询"""
conn = connection_pool.getconn() # 从池中获取连接
try:
with conn.cursor() as cur: # 游标自动关闭
cur.execute(
"SELECT * FROM samples WHERE sample_id = %s",
(sample_id,)
)
row = cur.fetchone()
if row:
col_names = [desc[0] for desc in cur.description] # 获取列名
return dict(zip(col_names, row))
return {}
except Exception as e:
conn.rollback() # 出错时回滚
raise e
finally:
connection_pool.putconn(conn) # ★ 必须归还连接!用try-finally确保
# ========== 3. 异步连接池(asyncpg,适合FastAPI等异步框架) ==========
import asyncio
import asyncpg
async def create_async_pool():
"""创建异步PostgreSQL连接池"""
pool = await asyncpg.create_pool(
"postgresql://user:password@localhost/biodb",
min_size=5, # 最少5个连接
max_size=20, # 最多20个连接
command_timeout=60 # SQL执行超时60秒
)
return pool
async def async_query_example(pool: asyncpg.Pool, gene_id: str):
"""异步查询示例(适合高并发Web应用)"""
async with pool.acquire() as conn: # 异步获取连接
rows = await conn.fetch(
"SELECT sample_id, expression FROM gene_expression WHERE gene_id = $1",
gene_id # $1 是asyncpg的参数占位符
)
return [dict(row) for row in rows]
# 并发查询示例
async def parallel_queries(pool, gene_ids: list):
"""并发查询多个基因(利用连接池同时处理多个查询)"""
tasks = [async_query_example(pool, gid) for gid in gene_ids]
results = await asyncio.gather(*tasks) # 并发执行所有查询
return dict(zip(gene_ids, results))
# ========== 4. 连接池监控与故障处理 ==========
import time
def monitor_pool_health(engine, interval=60):
"""定期监控连接池健康状态"""
pool = engine.pool
checkedout = pool.checkedout() # 使用中的连接数
pool_size = pool.size() # 总连接数
usage_pct = checkedout / pool_size * 100 if pool_size > 0 else 0
print(f"[{time.strftime('%H:%M:%S')}] 连接池使用率: {usage_pct:.1f}% ({checkedout}/{pool_size})")
# 告警阈值
if usage_pct > 80:
print("⚠️ 警告:连接池使用率超过80%,考虑增大pool_size!")
if usage_pct > 95:
print("🚨 紧急:连接池即将耗尽,可能导致请求超时!")
return usage_pct
面试常问点¶
Q: 连接池大小设置多少合适? A: 经典公式:
pool_size = CPU核数 × 2 + 磁盘数。对于IO密集型数据库查询,2-10倍CPU核数;同时要考虑数据库的max_connections限制。Q: 为什么要设置pool_recycle(连接回收时间)? A: MySQL默认8小时不活跃自动断开连接(wait_timeout),如果连接池持有8小时以上的连接就会失效。pool_recycle<8小时确保连接在过期前被重建。
Q: pool_pre_ping有什么作用?为什么要开启? A: 每次从池获取连接时先发一个轻量的ping(SELECT 1),检测连接是否还活着。开启后即使连接已断开,也不会把"死连接"给代码使用。代价是每次多一次往返,但比连接失败再重试更稳定。
Q: 什么是连接泄漏(Connection Leak)?如何排查? A: 代码获取连接后忘记归还(没有用try-finally或上下文管理器),导致连接被占用但不工作。排查:监控pool.checkedout()长时间等于pool_size,或查看数据库的连接列表(SHOW PROCESSLIST)。
速查表¶
# SQLAlchemy引擎创建(推荐参数)
engine = create_engine(
"postgresql://user:pass@host/db",
pool_size=10,
max_overflow=20,
pool_timeout=30,
pool_recycle=3600,
pool_pre_ping=True
)
# 使用连接(自动归还)
with engine.connect() as conn:
result = conn.execute(text("SELECT 1"))
# 使用会话(ORM)
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
with Session() as session:
session.execute(...)
session.commit()