跳转至

611 RabbitMQ 消息队列

一句话概述:RabbitMQ 是最流行的开源消息队列,实现了 AMQP 协议,用于应用之间的异步通信,白话就是"应用之间传话的邮局"。

核心知识点速查表

知识点说明
最新版本4.3.0(2026年)
核心协议AMQP 1.0(原生)、MQTT、STOMP
核心概念Producer → Exchange → Queue → Consumer
队列类型Quorum Queue(推荐)、Classic Queue、Stream
管理界面Web UI(默认端口 15672)
适用场景异步任务、微服务通信、事件驱动、削峰填谷

一、安装配置

1.1 Docker 安装(推荐)

# 带管理界面的版本
docker run -d \
  --name rabbitmq \                    # 容器名
  -p 5672:5672 \                       # AMQP 协议端口
  -p 15672:15672 \                     # Web 管理界面端口
  -e RABBITMQ_DEFAULT_USER=admin \     # 管理员用户
  -e RABBITMQ_DEFAULT_PASS=password \  # 管理员密码
  rabbitmq:4-management                # 带管理插件的镜像

# 访问管理界面:http://localhost:15672
# 用 admin/password 登录

1.2 核心概念图解

Producer(生产者)                     Consumer(消费者)
    |                                      ↑
    ↓                                      |
Exchange(交换机)→ Binding → Queue(队列)→
    |               规则          |
    |                            |
    |--- direct(精确路由)       |--- 消息在这里排队
    |--- fanout(广播)          |--- 先进先出
    |--- topic(模式匹配)       |--- 可持久化
    |--- headers(头匹配)

白话解释:
- Producer = 寄信人
- Exchange = 邮局(决定信件去哪个邮箱)
- Queue = 邮箱(信件排队等取件)
- Consumer = 收信人
- Binding = 邮局的投递规则

二、基本使用(Python)

2.1 安装客户端

pip install pika                       # Python 的 RabbitMQ 客户端

2.2 简单生产者/消费者

# === producer.py(生产者:发送消息)===
import pika  # 导入 RabbitMQ 客户端

# 建立连接
credentials = pika.PlainCredentials('admin', 'password')  # 认证信息
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost', 5672, '/', credentials)  # 连接参数
)
channel = connection.channel()         # 创建频道

# 声明队列(如果不存在就创建)
channel.queue_declare(
    queue='task_queue',                # 队列名
    durable=True                       # 持久化(RabbitMQ重启后队列还在)
)

# 发送消息
message = '处理订单 #12345'
channel.basic_publish(
    exchange='',                       # 空字符串 = 默认交换机
    routing_key='task_queue',          # 路由键 = 队列名(直接投递)
    body=message,                      # 消息内容
    properties=pika.BasicProperties(
        delivery_mode=2                # 消息持久化(2=持久化)
    )
)
print(f"已发送: {message}")

connection.close()                     # 关闭连接
# === consumer.py(消费者:接收消息)===
import pika
import time

credentials = pika.PlainCredentials('admin', 'password')
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost', 5672, '/', credentials)
)
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

def callback(ch, method, properties, body):
    """收到消息时的回调函数"""
    message = body.decode()            # 解码消息
    print(f"收到: {message}")
    time.sleep(2)                      # 模拟处理耗时
    print(f"处理完成: {message}")
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动确认(告诉RabbitMQ已处理)

# 每次只处理一条消息(公平分发)
channel.basic_qos(prefetch_count=1)    # 未确认的消息最多1条

# 开始消费
channel.basic_consume(
    queue='task_queue',                # 监听的队列
    on_message_callback=callback       # 回调函数
    # auto_ack=False 是默认值,手动确认模式
)

print("等待消息... 按 Ctrl+C 退出")
channel.start_consuming()              # 阻塞等待消息

2.3 发布/订阅(Fanout)

# === 发布者:广播消息给所有订阅者 ===
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明 fanout 交换机(广播模式)
channel.exchange_declare(
    exchange='notifications',          # 交换机名
    exchange_type='fanout'             # fanout = 广播给所有绑定的队列
)

channel.basic_publish(
    exchange='notifications',          # 指定交换机
    routing_key='',                    # fanout 模式不需要路由键
    body='系统通知:服务器将在22:00维护'
)
connection.close()

# === 订阅者:每个订阅者有自己的队列 ===
channel = connection.channel()
channel.exchange_declare(exchange='notifications', exchange_type='fanout')

# 创建临时队列(断开连接自动删除)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue       # 获取随机队列名

# 绑定队列到交换机
channel.queue_bind(exchange='notifications', queue=queue_name)

def callback(ch, method, properties, body):
    print(f"收到通知: {body.decode()}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

三、高级用法

3.1 Topic 交换机(模式匹配路由)

# 白话:用通配符决定消息去哪个队列
# * 匹配一个单词,# 匹配零个或多个单词
channel.exchange_declare(exchange='logs', exchange_type='topic')

# 发送
channel.basic_publish(exchange='logs', routing_key='app.error.database', body='DB连接失败')
channel.basic_publish(exchange='logs', routing_key='app.info.user', body='用户登录')

# 订阅者1:只收 error 级别
channel.queue_bind(exchange='logs', queue=q1, routing_key='app.error.*')

# 订阅者2:收所有 app 开头的
channel.queue_bind(exchange='logs', queue=q2, routing_key='app.#')

3.2 死信队列(DLX)

# 白话:处理失败的消息不直接丢弃,而是转到"死信队列"等待人工处理
channel.queue_declare(
    queue='main_queue',
    arguments={
        'x-dead-letter-exchange': 'dlx_exchange',  # 死信交换机
        'x-dead-letter-routing-key': 'dead_letters',  # 死信路由键
        'x-message-ttl': 60000         # 消息超时时间(60秒)
    }
)

3.3 Quorum Queue(推荐的持久化队列)

# RabbitMQ 4.x 推荐使用 Quorum Queue 替代旧的镜像队列
channel.queue_declare(
    queue='important_tasks',
    arguments={
        'x-queue-type': 'quorum'       # 声明为 Quorum 队列
    }
)
# Quorum Queue 基于 Raft 共识算法,自动在集群节点间复制
# 比旧的镜像队列更可靠,是 RabbitMQ 4.x 唯一支持的复制队列类型

四、常见报错与解决

4.1 连接被拒绝

pika.exceptions.AMQPConnectionError: Connection refused

解决:确认 RabbitMQ 在运行,端口 5672 可访问。

4.2 认证失败

ACCESS_REFUSED - Login was refused

解决:检查用户名密码,或在管理界面创建新用户。

4.3 队列消息堆积

解决:增加消费者数量,或设置 x-max-length 限制队列长度。


五、速查表

操作说明
管理界面http://localhost:15672
列出队列rabbitmqctl list_queues
列出交换机rabbitmqctl list_exchanges
查看连接rabbitmqctl list_connections
清空队列rabbitmqctl purge_queue queue_name
启用插件rabbitmq-plugins enable rabbitmq_management

六、同类工具对比

特性RabbitMQKafkaNATSRedis Pub/Sub
消息模型队列+交换机日志流发布订阅发布订阅
消息持久化支持原生JetStream不支持
消息确认逐条确认偏移量确认
吞吐量万级/秒百万级/秒百万级/秒万级/秒
复杂路由极强
适合场景任务队列流处理微服务简单通知

选型建议:需要复杂路由和消息确认选 RabbitMQ;大规模日志/流处理选 Kafka;轻量级微服务通信选 NATS。


参考资料RabbitMQ 官方文档 | Pika 文档 | RabbitMQ 4.3 发布