跳转至

419_数据库连接池管理


一句话说明

连接池就是预先建好一批数据库连接放着,代码需要时直接借用,用完还回去,避免每次都重新建连接的开销。


核心知识点

为什么需要连接池

  • 建连接代价高:TCP握手 + 数据库认证 + 初始化,需要10-50ms
  • 高并发问题:100个请求同时来,100个连接同时建→数据库崩溃
  • 连接池方案:预建固定数量连接,请求从池里借用,用完归还

关键参数

参数含义建议值
pool_size连接池大小(保持的连接数)CPU核数 × 2 + 磁盘数
max_overflow超出pool_size的最大额外连接数pool_size的50%
pool_timeout等待获取连接的超时时间30秒
pool_recycle连接回收时间(防止连接过期)3600秒
pool_pre_ping使用前ping一下(防止使用已断开的连接)True

实战代码

# ========== Python数据库连接池管理 ==========

# ========== 1. SQLAlchemy连接池(最常用的Python ORM) ==========
from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool

# 创建带连接池的数据库引擎
engine = create_engine(
    "postgresql://user:password@localhost:5432/biodb",
    # 连接池配置
    pool_size=10,          # 连接池大小(保持10个连接)
    max_overflow=20,       # 超出时最多再建20个连接(峰值=10+20=30)
    pool_timeout=30,       # 等待连接超时30秒(超时抛异常而非无限等待)
    pool_recycle=3600,     # 每小时回收连接(防止MySQL的8小时超时断开)
    pool_pre_ping=True,    # 使用连接前先ping(自动剔除断开的连接)
    echo=False             # 不打印SQL日志(生产环境)
)

# 查询池状态
def print_pool_status(engine):
    """打印连接池当前状态"""
    pool = engine.pool
    print(f"池大小: {pool.size()}")          # 当前连接总数
    print(f"已检出: {pool.checkedout()}")    # 正在使用的连接数
    print(f"可用: {pool.checkedin()}")       # 空闲连接数
    print(f"溢出: {pool.overflow()}")        # 超出pool_size的连接数

# 使用连接(with语句自动归还连接)
def query_gene_expression(gene_id: str) -> list:
    """查询基因表达量(连接自动从池中获取和归还)"""
    with engine.connect() as conn:  # 从池获取连接
        result = conn.execute(
            text("SELECT sample_id, expression FROM gene_expression WHERE gene_id = :gene_id"),
            {"gene_id": gene_id}  # 参数化查询(防SQL注入)
        )
        return result.fetchall()
    # with块结束后连接自动归还到池!

# 批量查询(使用同一个连接)
def batch_query(gene_ids: list) -> dict:
    """批量查询(多个查询共用一个连接,减少连接获取次数)"""
    results = {}
    with engine.connect() as conn:  # 获取一次连接
        for gene_id in gene_ids:
            result = conn.execute(
                text("SELECT AVG(expression) as avg_expr FROM gene_expression WHERE gene_id = :gid"),
                {"gid": gene_id}
            )
            row = result.fetchone()
            results[gene_id] = row[0] if row else None
    return results  # 连接最后归还

print_pool_status(engine)

# ========== 2. psycopg2连接池(PostgreSQL原生驱动) ==========
from psycopg2 import pool as pg_pool
import threading

# 创建线程安全的连接池
connection_pool = pg_pool.ThreadedConnectionPool(
    minconn=2,    # 最少保持2个连接
    maxconn=20,   # 最多20个连接
    host="localhost",
    database="biodb",
    user="biouser",
    password="biopass",
    port=5432
)

def get_sample_metadata(sample_id: str) -> dict:
    """使用psycopg2连接池查询"""
    conn = connection_pool.getconn()  # 从池中获取连接
    try:
        with conn.cursor() as cur:   # 游标自动关闭
            cur.execute(
                "SELECT * FROM samples WHERE sample_id = %s",
                (sample_id,)
            )
            row = cur.fetchone()
            if row:
                col_names = [desc[0] for desc in cur.description]  # 获取列名
                return dict(zip(col_names, row))
            return {}
    except Exception as e:
        conn.rollback()  # 出错时回滚
        raise e
    finally:
        connection_pool.putconn(conn)  # ★ 必须归还连接!用try-finally确保

# ========== 3. 异步连接池(asyncpg,适合FastAPI等异步框架) ==========
import asyncio
import asyncpg

async def create_async_pool():
    """创建异步PostgreSQL连接池"""
    pool = await asyncpg.create_pool(
        "postgresql://user:password@localhost/biodb",
        min_size=5,    # 最少5个连接
        max_size=20,   # 最多20个连接
        command_timeout=60  # SQL执行超时60秒
    )
    return pool

async def async_query_example(pool: asyncpg.Pool, gene_id: str):
    """异步查询示例(适合高并发Web应用)"""
    async with pool.acquire() as conn:  # 异步获取连接
        rows = await conn.fetch(
            "SELECT sample_id, expression FROM gene_expression WHERE gene_id = $1",
            gene_id  # $1 是asyncpg的参数占位符
        )
        return [dict(row) for row in rows]

# 并发查询示例
async def parallel_queries(pool, gene_ids: list):
    """并发查询多个基因(利用连接池同时处理多个查询)"""
    tasks = [async_query_example(pool, gid) for gid in gene_ids]
    results = await asyncio.gather(*tasks)  # 并发执行所有查询
    return dict(zip(gene_ids, results))

# ========== 4. 连接池监控与故障处理 ==========
import time

def monitor_pool_health(engine, interval=60):
    """定期监控连接池健康状态"""
    pool = engine.pool

    checkedout = pool.checkedout()    # 使用中的连接数
    pool_size = pool.size()           # 总连接数

    usage_pct = checkedout / pool_size * 100 if pool_size > 0 else 0

    print(f"[{time.strftime('%H:%M:%S')}] 连接池使用率: {usage_pct:.1f}% ({checkedout}/{pool_size})")

    # 告警阈值
    if usage_pct > 80:
        print("⚠️ 警告:连接池使用率超过80%,考虑增大pool_size!")
    if usage_pct > 95:
        print("🚨 紧急:连接池即将耗尽,可能导致请求超时!")

    return usage_pct

面试常问点

  1. Q: 连接池大小设置多少合适? A: 经典公式:pool_size = CPU核数 × 2 + 磁盘数。对于IO密集型数据库查询,2-10倍CPU核数;同时要考虑数据库的max_connections限制。

  2. Q: 为什么要设置pool_recycle(连接回收时间)? A: MySQL默认8小时不活跃自动断开连接(wait_timeout),如果连接池持有8小时以上的连接就会失效。pool_recycle<8小时确保连接在过期前被重建。

  3. Q: pool_pre_ping有什么作用?为什么要开启? A: 每次从池获取连接时先发一个轻量的ping(SELECT 1),检测连接是否还活着。开启后即使连接已断开,也不会把"死连接"给代码使用。代价是每次多一次往返,但比连接失败再重试更稳定。

  4. Q: 什么是连接泄漏(Connection Leak)?如何排查? A: 代码获取连接后忘记归还(没有用try-finally或上下文管理器),导致连接被占用但不工作。排查:监控pool.checkedout()长时间等于pool_size,或查看数据库的连接列表(SHOW PROCESSLIST)。


速查表

# SQLAlchemy引擎创建(推荐参数)
engine = create_engine(
    "postgresql://user:pass@host/db",
    pool_size=10,
    max_overflow=20,
    pool_timeout=30,
    pool_recycle=3600,
    pool_pre_ping=True
)

# 使用连接(自动归还)
with engine.connect() as conn:
    result = conn.execute(text("SELECT 1"))

# 使用会话(ORM)
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
with Session() as session:
    session.execute(...)
    session.commit()