跳转至

436_并发设计模式


一句话说明

并发设计模式是解决多线程/多进程环境下协作、同步、资源共享等问题的经典套路,面试常考生产者消费者、读写锁、线程池等。


核心知识点

白话理解

想象一个餐厅:厨师(生产者)做菜,服务员(消费者)端菜,备餐台(缓冲区)放菜。如果备餐台满了厨师等,空了服务员等——这就是生产者消费者模式。并发设计模式就是处理这类"谁等谁、谁通知谁"问题的标准方案。

核心模式

  1. 生产者-消费者模式:生产者放数据,消费者取数据,队列作缓冲
  2. 读写锁模式:多读者可并行,写者独占
  3. 线程池模式:预先创建线程复用,避免频繁创建销毁
  4. Future/Promise模式:异步任务占位符,稍后获取结果
  5. Actor模式:每个Actor独立,通过消息通信,不共享状态

经典题目与解法

题目1:用有界队列实现生产者消费者(手写多线程)

题意:实现一个线程安全的有界队列,生产者放满后阻塞,消费者取空后阻塞。

import threading
from collections import deque

class BoundedQueue:
    def __init__(self, capacity):
        self.capacity = capacity          # 队列最大容量
        self.queue = deque()              # 双端队列存数据
        self.lock = threading.Lock()      # 互斥锁保护队列
        self.not_full = threading.Condition(self.lock)   # 未满条件变量
        self.not_empty = threading.Condition(self.lock)  # 非空条件变量

    def put(self, item):
        with self.not_full:               # 获取锁
            while len(self.queue) >= self.capacity:
                self.not_full.wait()      # 队列满,生产者等待
            self.queue.append(item)       # 放入数据
            self.not_empty.notify()       # 通知消费者可以取了

    def get(self):
        with self.not_empty:              # 获取锁
            while len(self.queue) == 0:
                self.not_empty.wait()     # 队列空,消费者等待
            item = self.queue.popleft()   # 取出数据
            self.not_full.notify()        # 通知生产者可以放了
            return item

# 测试
q = BoundedQueue(3)
import time

def producer():
    for i in range(5):
        q.put(i)                         # 生产数据
        print(f"生产: {i}")
        time.sleep(0.1)

def consumer():
    for i in range(5):
        val = q.get()                    # 消费数据
        print(f"消费: {val}")
        time.sleep(0.3)

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()

时间复杂度:put/get 均 O(1)
空间复杂度:O(capacity)


题目2:实现一个简单线程池

题意:预先创建固定数量线程,接受任务并执行,支持优雅关闭。

import threading
from queue import Queue

class ThreadPool:
    def __init__(self, num_workers):
        self.task_queue = Queue()         # 任务队列
        self.workers = []                 # 工作线程列表
        self._stop_event = threading.Event()  # 停止信号

        for _ in range(num_workers):
            t = threading.Thread(target=self._worker)
            t.daemon = True               # 设为守护线程,主线程退出时自动结束
            t.start()
            self.workers.append(t)

    def _worker(self):
        while not self._stop_event.is_set():  # 未收到停止信号就循环
            try:
                func, args = self.task_queue.get(timeout=1)  # 阻塞取任务
                func(*args)               # 执行任务
                self.task_queue.task_done()  # 标记任务完成
            except:
                pass                      # 超时或异常继续等待

    def submit(self, func, *args):
        self.task_queue.put((func, args)) # 提交任务到队列

    def shutdown(self):
        self.task_queue.join()            # 等待所有任务完成
        self._stop_event.set()            # 发出停止信号

# 使用示例
pool = ThreadPool(3)
results = []

def task(n):
    results.append(n * n)

for i in range(10):
    pool.submit(task, i)

pool.shutdown()
print(sorted(results))  # 输出各平方值

时间复杂度:任务执行取决于任务本身,调度 O(1)
空间复杂度:O(num_workers + queue_size)


面试技巧

  1. 必说 happens-before:面试官喜欢听你聊内存可见性,提一句 volatile / happens-before 加分
  2. 死锁四条件:互斥、持有并等待、不可剥夺、循环等待——背熟能快速扯出来
  3. 锁粒度权衡:锁太粗吞吐低,锁太细容易死锁,说出这个权衡点
  4. Python GIL 局限:Python 多线程受 GIL 限制,CPU 密集任务用多进程,IO 密集用多线程或 asyncio
  5. 推荐使用标准库:实际生产用 concurrent.futures.ThreadPoolExecutor,别手写

速查表

模式核心工具适用场景
生产者消费者Queue + Condition解耦生产消费速度差异
读写锁threading.RLock读多写少场景
线程池ThreadPoolExecutor大量短任务并行
Futureconcurrent.futures异步获取结果
信号量threading.Semaphore限制并发数量
# Python 并发三件套速查
import threading          # 线程
import multiprocessing    # 进程
import asyncio            # 协程(IO密集首选)

# 推荐写法:线程池
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(task, i) for i in range(10)]
    results = [f.result() for f in futures]