611 RabbitMQ 消息队列¶
一句话概述:RabbitMQ 是最流行的开源消息队列,实现了 AMQP 协议,用于应用之间的异步通信,白话就是"应用之间传话的邮局"。
核心知识点速查表¶
| 知识点 | 说明 |
|---|---|
| 最新版本 | 4.3.0(2026年) |
| 核心协议 | AMQP 1.0(原生)、MQTT、STOMP |
| 核心概念 | Producer → Exchange → Queue → Consumer |
| 队列类型 | Quorum Queue(推荐)、Classic Queue、Stream |
| 管理界面 | Web UI(默认端口 15672) |
| 适用场景 | 异步任务、微服务通信、事件驱动、削峰填谷 |
一、安装配置¶
1.1 Docker 安装(推荐)¶
# 带管理界面的版本
docker run -d \
--name rabbitmq \ # 容器名
-p 5672:5672 \ # AMQP 协议端口
-p 15672:15672 \ # Web 管理界面端口
-e RABBITMQ_DEFAULT_USER=admin \ # 管理员用户
-e RABBITMQ_DEFAULT_PASS=password \ # 管理员密码
rabbitmq:4-management # 带管理插件的镜像
# 访问管理界面:http://localhost:15672
# 用 admin/password 登录
1.2 核心概念图解¶
Producer(生产者) Consumer(消费者)
| ↑
↓ |
Exchange(交换机)→ Binding → Queue(队列)→
| 规则 |
| |
|--- direct(精确路由) |--- 消息在这里排队
|--- fanout(广播) |--- 先进先出
|--- topic(模式匹配) |--- 可持久化
|--- headers(头匹配)
白话解释:
- Producer = 寄信人
- Exchange = 邮局(决定信件去哪个邮箱)
- Queue = 邮箱(信件排队等取件)
- Consumer = 收信人
- Binding = 邮局的投递规则
二、基本使用(Python)¶
2.1 安装客户端¶
2.2 简单生产者/消费者¶
# === producer.py(生产者:发送消息)===
import pika # 导入 RabbitMQ 客户端
# 建立连接
credentials = pika.PlainCredentials('admin', 'password') # 认证信息
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', 5672, '/', credentials) # 连接参数
)
channel = connection.channel() # 创建频道
# 声明队列(如果不存在就创建)
channel.queue_declare(
queue='task_queue', # 队列名
durable=True # 持久化(RabbitMQ重启后队列还在)
)
# 发送消息
message = '处理订单 #12345'
channel.basic_publish(
exchange='', # 空字符串 = 默认交换机
routing_key='task_queue', # 路由键 = 队列名(直接投递)
body=message, # 消息内容
properties=pika.BasicProperties(
delivery_mode=2 # 消息持久化(2=持久化)
)
)
print(f"已发送: {message}")
connection.close() # 关闭连接
# === consumer.py(消费者:接收消息)===
import pika
import time
credentials = pika.PlainCredentials('admin', 'password')
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', 5672, '/', credentials)
)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
def callback(ch, method, properties, body):
"""收到消息时的回调函数"""
message = body.decode() # 解码消息
print(f"收到: {message}")
time.sleep(2) # 模拟处理耗时
print(f"处理完成: {message}")
ch.basic_ack(delivery_tag=method.delivery_tag) # 手动确认(告诉RabbitMQ已处理)
# 每次只处理一条消息(公平分发)
channel.basic_qos(prefetch_count=1) # 未确认的消息最多1条
# 开始消费
channel.basic_consume(
queue='task_queue', # 监听的队列
on_message_callback=callback # 回调函数
# auto_ack=False 是默认值,手动确认模式
)
print("等待消息... 按 Ctrl+C 退出")
channel.start_consuming() # 阻塞等待消息
2.3 发布/订阅(Fanout)¶
# === 发布者:广播消息给所有订阅者 ===
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明 fanout 交换机(广播模式)
channel.exchange_declare(
exchange='notifications', # 交换机名
exchange_type='fanout' # fanout = 广播给所有绑定的队列
)
channel.basic_publish(
exchange='notifications', # 指定交换机
routing_key='', # fanout 模式不需要路由键
body='系统通知:服务器将在22:00维护'
)
connection.close()
# === 订阅者:每个订阅者有自己的队列 ===
channel = connection.channel()
channel.exchange_declare(exchange='notifications', exchange_type='fanout')
# 创建临时队列(断开连接自动删除)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue # 获取随机队列名
# 绑定队列到交换机
channel.queue_bind(exchange='notifications', queue=queue_name)
def callback(ch, method, properties, body):
print(f"收到通知: {body.decode()}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
三、高级用法¶
3.1 Topic 交换机(模式匹配路由)¶
# 白话:用通配符决定消息去哪个队列
# * 匹配一个单词,# 匹配零个或多个单词
channel.exchange_declare(exchange='logs', exchange_type='topic')
# 发送
channel.basic_publish(exchange='logs', routing_key='app.error.database', body='DB连接失败')
channel.basic_publish(exchange='logs', routing_key='app.info.user', body='用户登录')
# 订阅者1:只收 error 级别
channel.queue_bind(exchange='logs', queue=q1, routing_key='app.error.*')
# 订阅者2:收所有 app 开头的
channel.queue_bind(exchange='logs', queue=q2, routing_key='app.#')
3.2 死信队列(DLX)¶
# 白话:处理失败的消息不直接丢弃,而是转到"死信队列"等待人工处理
channel.queue_declare(
queue='main_queue',
arguments={
'x-dead-letter-exchange': 'dlx_exchange', # 死信交换机
'x-dead-letter-routing-key': 'dead_letters', # 死信路由键
'x-message-ttl': 60000 # 消息超时时间(60秒)
}
)
3.3 Quorum Queue(推荐的持久化队列)¶
# RabbitMQ 4.x 推荐使用 Quorum Queue 替代旧的镜像队列
channel.queue_declare(
queue='important_tasks',
arguments={
'x-queue-type': 'quorum' # 声明为 Quorum 队列
}
)
# Quorum Queue 基于 Raft 共识算法,自动在集群节点间复制
# 比旧的镜像队列更可靠,是 RabbitMQ 4.x 唯一支持的复制队列类型
四、常见报错与解决¶
4.1 连接被拒绝¶
解决:确认 RabbitMQ 在运行,端口 5672 可访问。
4.2 认证失败¶
解决:检查用户名密码,或在管理界面创建新用户。
4.3 队列消息堆积¶
解决:增加消费者数量,或设置 x-max-length 限制队列长度。
五、速查表¶
| 操作 | 说明 |
|---|---|
| 管理界面 | http://localhost:15672 |
| 列出队列 | rabbitmqctl list_queues |
| 列出交换机 | rabbitmqctl list_exchanges |
| 查看连接 | rabbitmqctl list_connections |
| 清空队列 | rabbitmqctl purge_queue queue_name |
| 启用插件 | rabbitmq-plugins enable rabbitmq_management |
六、同类工具对比¶
| 特性 | RabbitMQ | Kafka | NATS | Redis Pub/Sub |
|---|---|---|---|---|
| 消息模型 | 队列+交换机 | 日志流 | 发布订阅 | 发布订阅 |
| 消息持久化 | 支持 | 原生 | JetStream | 不支持 |
| 消息确认 | 逐条确认 | 偏移量 | 确认 | 无 |
| 吞吐量 | 万级/秒 | 百万级/秒 | 百万级/秒 | 万级/秒 |
| 复杂路由 | 极强 | 弱 | 弱 | 无 |
| 适合场景 | 任务队列 | 流处理 | 微服务 | 简单通知 |
选型建议:需要复杂路由和消息确认选 RabbitMQ;大规模日志/流处理选 Kafka;轻量级微服务通信选 NATS。
参考资料:RabbitMQ 官方文档 | Pika 文档 | RabbitMQ 4.3 发布