546_数据库分片与读写分离
一句话说明
读写分离让主库专注写入,从库承接查询;分片则将数据水平切割到多台数据库,解决单机容量瓶颈。
核心知识点
读写分离
应用层
├─ 写操作 → 主库(Master)← 所有写入
└─ 读操作 → 从库(Slave1/Slave2) ← 查询分流
主从同步:
MySQL: Binlog 异步/半同步复制
PostgreSQL: WAL 流复制
注意:存在复制延迟(通常<1秒),
对强一致性要求高的读操作要走主库!
分片策略
| 策略 | 方式 | 优点 | 缺点 |
|---|
| 范围分片 | 按ID范围分 | 范围查询快 | 热点数据不均 |
| 哈希分片 | hash(key) % N | 分布均匀 | 范围查询慢 |
| 一致性哈希 | 哈希环 | 扩容影响小 | 实现复杂 |
| 按地理 | 按区域分 | 数据本地化 | 跨区域查询难 |
| 按时间 | 按月/年分表 | 归档方便 | 跨时间查询慢 |
垂直拆分 vs 水平拆分
垂直拆分(按功能):
用户库:users, user_profiles
订单库:orders, order_items
分析库:samples, analysis_jobs
水平拆分(按数据量):
samples表 → samples_0, samples_1, samples_2, samples_3
根据 sample_id % 4 路由到对应表
实战代码/设计图/模板
读写分离架构图
应用服务
│
[数据库中间件 ProxySQL / MyCat]
│ │
写请求 读请求
│ │
[主库] [从库1][从库2]
写入 ←→读取 ←→读取
│
Binlog同步
│
[从库1] [从库2](复制延迟<1s)
Python 读写分离实现
import random
from contextlib import contextmanager
import psycopg2
class DatabaseRouter:
"""数据库读写分离路由"""
def __init__(self):
self.master = {
"host": "db-master.example.com",
"port": 5432, "dbname": "biodb"
}
self.slaves = [
{"host": "db-slave1.example.com", "port": 5432, "dbname": "biodb"},
{"host": "db-slave2.example.com", "port": 5432, "dbname": "biodb"},
]
def get_master(self):
"""获取主库连接(用于写操作)"""
return psycopg2.connect(**self.master)
def get_slave(self):
"""随机获取从库连接(用于读操作)"""
slave_config = random.choice(self.slaves)
return psycopg2.connect(**slave_config)
@contextmanager
def write_conn(self):
"""写操作上下文管理器"""
conn = self.get_master()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
@contextmanager
def read_conn(self):
"""读操作上下文管理器"""
conn = self.get_slave()
try:
yield conn
finally:
conn.close()
router = DatabaseRouter()
# 写操作用主库
def create_sample(name: str, project_id: str):
with router.write_conn() as conn:
cur = conn.cursor()
cur.execute(
"INSERT INTO samples (name, project_id) VALUES (%s, %s) RETURNING id",
(name, project_id)
)
return cur.fetchone()[0]
# 读操作用从库
def get_samples(project_id: str):
with router.read_conn() as conn:
cur = conn.cursor()
cur.execute(
"SELECT * FROM samples WHERE project_id = %s",
(project_id,)
)
return cur.fetchall()
哈希分片路由
class ShardRouter:
"""哈希分片路由(4个分片)"""
SHARD_COUNT = 4
def __init__(self):
self.shards = {
0: "postgresql://db-shard0:5432/biodb",
1: "postgresql://db-shard1:5432/biodb",
2: "postgresql://db-shard2:5432/biodb",
3: "postgresql://db-shard3:5432/biodb",
}
def get_shard(self, shard_key: str) -> int:
"""根据 shard_key 计算分片索引"""
return hash(shard_key) % self.SHARD_COUNT
def get_connection(self, sample_id: str):
"""根据 sample_id 获取对应分片连接"""
shard_id = self.get_shard(sample_id)
return self.shards[shard_id]
def get_table_name(self, base_table: str, sample_id: str) -> str:
"""获取分片表名"""
shard_id = self.get_shard(sample_id)
return f"{base_table}_{shard_id}"
shard_router = ShardRouter()
def get_sample(sample_id: str) -> dict:
table = shard_router.get_table_name("samples", sample_id)
conn_str = shard_router.get_connection(sample_id)
# 查询对应分片
# SELECT * FROM samples_2 WHERE id = 'xxx'
pass
面试常问点
| 问题 | 参考答案 |
|---|
| 主从延迟怎么处理? | 强一致读走主库;业务可以接受延迟才走从库 |
| 分片后如何做跨分片查询? | 汇总到应用层合并;或使用分布式查询引擎 |
| 如何动态扩容分片? | 一致性哈希减少迁移量;翻倍扩容 |
| 分布式事务怎么处理? | 2PC/Saga/TCC,尽量避免跨分片事务 |
| 为什么不直接用分布式数据库? | TiDB/CockroachDB有运维成本,大厂才用 |
速查表
MySQL 主从配置要点:
主库:开启 binlog,设置 server-id
从库:CHANGE MASTER TO,START SLAVE
查看延迟:SHOW SLAVE STATUS\G → Seconds_Behind_Master
常见分片中间件:
MyCat(Java,国产)
ShardingSphere(Java,Apache顶级项目)
Vitess(Go,YouTube出品,适合MySQL)
ProxySQL(轻量级读写分离代理)
PostgreSQL 主从:
主库:wal_level = replica
从库:recovery.conf 配置 primary_conninfo
查看延迟:pg_stat_replication