436_并发设计模式¶
一句话说明¶
并发设计模式是解决多线程/多进程环境下协作、同步、资源共享等问题的经典套路,面试常考生产者消费者、读写锁、线程池等。
核心知识点¶
白话理解¶
想象一个餐厅:厨师(生产者)做菜,服务员(消费者)端菜,备餐台(缓冲区)放菜。如果备餐台满了厨师等,空了服务员等——这就是生产者消费者模式。并发设计模式就是处理这类"谁等谁、谁通知谁"问题的标准方案。
核心模式¶
- 生产者-消费者模式:生产者放数据,消费者取数据,队列作缓冲
- 读写锁模式:多读者可并行,写者独占
- 线程池模式:预先创建线程复用,避免频繁创建销毁
- Future/Promise模式:异步任务占位符,稍后获取结果
- Actor模式:每个Actor独立,通过消息通信,不共享状态
经典题目与解法¶
题目1:用有界队列实现生产者消费者(手写多线程)¶
题意:实现一个线程安全的有界队列,生产者放满后阻塞,消费者取空后阻塞。
import threading
from collections import deque
class BoundedQueue:
def __init__(self, capacity):
self.capacity = capacity # 队列最大容量
self.queue = deque() # 双端队列存数据
self.lock = threading.Lock() # 互斥锁保护队列
self.not_full = threading.Condition(self.lock) # 未满条件变量
self.not_empty = threading.Condition(self.lock) # 非空条件变量
def put(self, item):
with self.not_full: # 获取锁
while len(self.queue) >= self.capacity:
self.not_full.wait() # 队列满,生产者等待
self.queue.append(item) # 放入数据
self.not_empty.notify() # 通知消费者可以取了
def get(self):
with self.not_empty: # 获取锁
while len(self.queue) == 0:
self.not_empty.wait() # 队列空,消费者等待
item = self.queue.popleft() # 取出数据
self.not_full.notify() # 通知生产者可以放了
return item
# 测试
q = BoundedQueue(3)
import time
def producer():
for i in range(5):
q.put(i) # 生产数据
print(f"生产: {i}")
time.sleep(0.1)
def consumer():
for i in range(5):
val = q.get() # 消费数据
print(f"消费: {val}")
time.sleep(0.3)
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()
时间复杂度:put/get 均 O(1)
空间复杂度:O(capacity)
题目2:实现一个简单线程池¶
题意:预先创建固定数量线程,接受任务并执行,支持优雅关闭。
import threading
from queue import Queue
class ThreadPool:
def __init__(self, num_workers):
self.task_queue = Queue() # 任务队列
self.workers = [] # 工作线程列表
self._stop_event = threading.Event() # 停止信号
for _ in range(num_workers):
t = threading.Thread(target=self._worker)
t.daemon = True # 设为守护线程,主线程退出时自动结束
t.start()
self.workers.append(t)
def _worker(self):
while not self._stop_event.is_set(): # 未收到停止信号就循环
try:
func, args = self.task_queue.get(timeout=1) # 阻塞取任务
func(*args) # 执行任务
self.task_queue.task_done() # 标记任务完成
except:
pass # 超时或异常继续等待
def submit(self, func, *args):
self.task_queue.put((func, args)) # 提交任务到队列
def shutdown(self):
self.task_queue.join() # 等待所有任务完成
self._stop_event.set() # 发出停止信号
# 使用示例
pool = ThreadPool(3)
results = []
def task(n):
results.append(n * n)
for i in range(10):
pool.submit(task, i)
pool.shutdown()
print(sorted(results)) # 输出各平方值
时间复杂度:任务执行取决于任务本身,调度 O(1)
空间复杂度:O(num_workers + queue_size)
面试技巧¶
- 必说 happens-before:面试官喜欢听你聊内存可见性,提一句 volatile / happens-before 加分
- 死锁四条件:互斥、持有并等待、不可剥夺、循环等待——背熟能快速扯出来
- 锁粒度权衡:锁太粗吞吐低,锁太细容易死锁,说出这个权衡点
- Python GIL 局限:Python 多线程受 GIL 限制,CPU 密集任务用多进程,IO 密集用多线程或 asyncio
- 推荐使用标准库:实际生产用
concurrent.futures.ThreadPoolExecutor,别手写
速查表¶
| 模式 | 核心工具 | 适用场景 |
|---|---|---|
| 生产者消费者 | Queue + Condition | 解耦生产消费速度差异 |
| 读写锁 | threading.RLock | 读多写少场景 |
| 线程池 | ThreadPoolExecutor | 大量短任务并行 |
| Future | concurrent.futures | 异步获取结果 |
| 信号量 | threading.Semaphore | 限制并发数量 |
# Python 并发三件套速查
import threading # 线程
import multiprocessing # 进程
import asyncio # 协程(IO密集首选)
# 推荐写法:线程池
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(task, i) for i in range(10)]
results = [f.result() for f in futures]