613 NATS 高性能消息系统¶
一句话概述:NATS 是超轻量级的消息系统,单个二进制文件不到 20MB,亚毫秒延迟,每秒百万级消息,CNCF 孵化项目,是微服务通信的"高速公路"。
核心知识点速查表¶
| 知识点 | 说明 |
|---|---|
| 最新版本 | v2.10.x(2025年) |
| 核心模式 | Pub/Sub、Request/Reply、Queue Group |
| 持久化层 | JetStream(内置) |
| 语言 | Go 编写,单二进制部署 |
| 延迟 | 亚毫秒级 |
| 适用场景 | 微服务通信、IoT、边缘计算、命令控制 |
| 许可证 | Apache 2.0 |
一、安装配置¶
# Docker 安装
docker run -d --name nats \
-p 4222:4222 \ # 客户端端口
-p 8222:8222 \ # 监控端口
nats:latest -js # -js 启用 JetStream
# 二进制安装
curl -L https://github.com/nats-io/nats-server/releases/latest/download/nats-server-v2.10.22-linux-amd64.tar.gz | tar xz
sudo mv nats-server /usr/local/bin/
# 安装 NATS CLI
go install github.com/nats-io/natscli/nats@latest # 需要 Go 环境
# 或下载二进制: https://github.com/nats-io/natscli/releases
# 启动
nats-server -js # 启动服务(-js 启用 JetStream)
二、基本使用¶
2.1 发布/订阅(Python)¶
# pip install nats-py
import asyncio
import nats # 导入 NATS 异步客户端
async def main():
# 连接 NATS 服务器
nc = await nats.connect("nats://localhost:4222")
# 订阅(白话:监听某个主题的消息)
async def handler(msg):
print(f"收到 [{msg.subject}]: {msg.data.decode()}")
sub = await nc.subscribe("greetings", cb=handler) # 订阅 greetings 主题
# 发布
await nc.publish("greetings", b"Hello NATS!") # 发布消息
await nc.publish("greetings", b"你好 NATS!")
await asyncio.sleep(1) # 等待消息处理
await sub.unsubscribe() # 取消订阅
await nc.close() # 关闭连接
asyncio.run(main())
2.2 请求/回复模式¶
import asyncio
import nats
async def main():
nc = await nats.connect("nats://localhost:4222")
# 服务端:监听请求并回复
async def handler(msg):
data = msg.data.decode()
print(f"收到请求: {data}")
await msg.respond(f"已处理: {data}".encode()) # 回复请求者
await nc.subscribe("service.echo", cb=handler)
# 客户端:发送请求并等待回复
response = await nc.request(
"service.echo", # 请求主题
b"ping", # 请求内容
timeout=5.0 # 超时时间(秒)
)
print(f"收到回复: {response.data.decode()}")
await nc.close()
asyncio.run(main())
2.3 队列组(负载均衡)¶
# 白话:同一队列组内的多个消费者分摊消息(类似 Kafka 消费者组)
async def worker(nc, name):
async def handler(msg):
print(f"[{name}] 处理: {msg.data.decode()}")
await nc.subscribe(
"tasks", # 主题
queue="workers", # 队列组名
cb=handler
)
# 启动3个worker,消息会在它们之间均匀分配
三、JetStream(持久化消息)¶
import asyncio
import nats
from nats.js.api import StreamConfig, ConsumerConfig # JetStream 配置
async def main():
nc = await nats.connect("nats://localhost:4222")
js = nc.jetstream() # 获取 JetStream 上下文
# 创建 Stream(白话:持久化的消息流,消息会保存到磁盘)
await js.add_stream(
name="EVENTS", # Stream 名称
subjects=["events.>"], # 匹配的主题(> 是通配符)
retention="limits", # 保留策略
max_msgs=100000, # 最多保留10万条
max_age=86400_000_000_000 # 最多保留24小时(纳秒)
)
# 发布持久化消息
ack = await js.publish("events.user.login", b'{"user": "zhangsan"}')
print(f"消息已持久化, seq: {ack.seq}")
# 创建持久化消费者
sub = await js.pull_subscribe(
"events.>", # 订阅主题
durable="my-consumer" # 持久化消费者名(记住消费位置)
)
# 拉取消息
msgs = await sub.fetch(batch=10, timeout=5) # 拉取最多10条
for msg in msgs:
print(f"收到: {msg.data.decode()}")
await msg.ack() # 确认消息
await nc.close()
asyncio.run(main())
四、常见报错与解决¶
4.1 连接失败¶
解决:确认 NATS 服务运行中,端口 4222 可访问。
4.2 JetStream 未启用¶
解决:启动时加 -js 参数:nats-server -js。
4.3 消息超时¶
解决:增大 timeout 参数,或检查服务端是否有对应的订阅者。
五、速查表¶
| 操作 | NATS CLI 命令 |
|---|---|
| 发布消息 | nats pub greetings "hello" |
| 订阅消息 | nats sub greetings |
| 请求/回复 | nats request service.echo "ping" |
| 创建Stream | nats stream add EVENTS --subjects="events.>" |
| 查看Stream | nats stream ls |
| 消费消息 | nats consumer next EVENTS my-consumer |
| 服务器信息 | nats server info |
| 监控 | 访问 http://localhost:8222/varz |
六、同类工具对比¶
| 特性 | NATS | Kafka | RabbitMQ | Redis Pub/Sub |
|---|---|---|---|---|
| 延迟 | 亚毫秒 | 毫秒级 | 毫秒级 | 亚毫秒 |
| 吞吐量 | 极高 | 极高 | 中等 | 高 |
| 部署复杂度 | 极低 | 高 | 中 | 低 |
| 持久化 | JetStream | 原生 | 支持 | 不支持 |
| 内存占用 | 极低 | 高 | 中 | 低 |
| 适合场景 | 微服务/IoT | 大数据 | 企业集成 | 简单通知 |
选型建议:轻量级微服务/IoT通信首选 NATS(最简单最快);需要消息回放和流处理选 Kafka;需要复杂路由选 RabbitMQ。