跳转至

613 NATS 高性能消息系统

一句话概述:NATS 是超轻量级的消息系统,单个二进制文件不到 20MB,亚毫秒延迟,每秒百万级消息,CNCF 孵化项目,是微服务通信的"高速公路"。

核心知识点速查表

知识点说明
最新版本v2.10.x(2025年)
核心模式Pub/Sub、Request/Reply、Queue Group
持久化层JetStream(内置)
语言Go 编写,单二进制部署
延迟亚毫秒级
适用场景微服务通信、IoT、边缘计算、命令控制
许可证Apache 2.0

一、安装配置

# Docker 安装
docker run -d --name nats \
  -p 4222:4222 \                       # 客户端端口
  -p 8222:8222 \                       # 监控端口
  nats:latest -js                      # -js 启用 JetStream

# 二进制安装
curl -L https://github.com/nats-io/nats-server/releases/latest/download/nats-server-v2.10.22-linux-amd64.tar.gz | tar xz
sudo mv nats-server /usr/local/bin/

# 安装 NATS CLI
go install github.com/nats-io/natscli/nats@latest  # 需要 Go 环境
# 或下载二进制: https://github.com/nats-io/natscli/releases

# 启动
nats-server -js                        # 启动服务(-js 启用 JetStream)

二、基本使用

2.1 发布/订阅(Python)

# pip install nats-py
import asyncio
import nats  # 导入 NATS 异步客户端

async def main():
    # 连接 NATS 服务器
    nc = await nats.connect("nats://localhost:4222")

    # 订阅(白话:监听某个主题的消息)
    async def handler(msg):
        print(f"收到 [{msg.subject}]: {msg.data.decode()}")

    sub = await nc.subscribe("greetings", cb=handler)  # 订阅 greetings 主题

    # 发布
    await nc.publish("greetings", b"Hello NATS!")      # 发布消息
    await nc.publish("greetings", b"你好 NATS!")

    await asyncio.sleep(1)             # 等待消息处理
    await sub.unsubscribe()            # 取消订阅
    await nc.close()                   # 关闭连接

asyncio.run(main())

2.2 请求/回复模式

import asyncio
import nats

async def main():
    nc = await nats.connect("nats://localhost:4222")

    # 服务端:监听请求并回复
    async def handler(msg):
        data = msg.data.decode()
        print(f"收到请求: {data}")
        await msg.respond(f"已处理: {data}".encode())  # 回复请求者

    await nc.subscribe("service.echo", cb=handler)

    # 客户端:发送请求并等待回复
    response = await nc.request(
        "service.echo",               # 请求主题
        b"ping",                       # 请求内容
        timeout=5.0                    # 超时时间(秒)
    )
    print(f"收到回复: {response.data.decode()}")

    await nc.close()

asyncio.run(main())

2.3 队列组(负载均衡)

# 白话:同一队列组内的多个消费者分摊消息(类似 Kafka 消费者组)
async def worker(nc, name):
    async def handler(msg):
        print(f"[{name}] 处理: {msg.data.decode()}")

    await nc.subscribe(
        "tasks",                       # 主题
        queue="workers",               # 队列组名
        cb=handler
    )

# 启动3个worker,消息会在它们之间均匀分配

三、JetStream(持久化消息)

import asyncio
import nats
from nats.js.api import StreamConfig, ConsumerConfig  # JetStream 配置

async def main():
    nc = await nats.connect("nats://localhost:4222")
    js = nc.jetstream()                # 获取 JetStream 上下文

    # 创建 Stream(白话:持久化的消息流,消息会保存到磁盘)
    await js.add_stream(
        name="EVENTS",                # Stream 名称
        subjects=["events.>"],         # 匹配的主题(> 是通配符)
        retention="limits",            # 保留策略
        max_msgs=100000,               # 最多保留10万条
        max_age=86400_000_000_000      # 最多保留24小时(纳秒)
    )

    # 发布持久化消息
    ack = await js.publish("events.user.login", b'{"user": "zhangsan"}')
    print(f"消息已持久化, seq: {ack.seq}")

    # 创建持久化消费者
    sub = await js.pull_subscribe(
        "events.>",                    # 订阅主题
        durable="my-consumer"          # 持久化消费者名(记住消费位置)
    )

    # 拉取消息
    msgs = await sub.fetch(batch=10, timeout=5)  # 拉取最多10条
    for msg in msgs:
        print(f"收到: {msg.data.decode()}")
        await msg.ack()                # 确认消息

    await nc.close()

asyncio.run(main())

四、常见报错与解决

4.1 连接失败

nats.errors.NoServersError: no servers available

解决:确认 NATS 服务运行中,端口 4222 可访问。

4.2 JetStream 未启用

nats.js.errors.ServiceUnavailableError

解决:启动时加 -js 参数:nats-server -js

4.3 消息超时

nats.errors.TimeoutError

解决:增大 timeout 参数,或检查服务端是否有对应的订阅者。


五、速查表

操作NATS CLI 命令
发布消息nats pub greetings "hello"
订阅消息nats sub greetings
请求/回复nats request service.echo "ping"
创建Streamnats stream add EVENTS --subjects="events.>"
查看Streamnats stream ls
消费消息nats consumer next EVENTS my-consumer
服务器信息nats server info
监控访问 http://localhost:8222/varz

六、同类工具对比

特性NATSKafkaRabbitMQRedis Pub/Sub
延迟亚毫秒毫秒级毫秒级亚毫秒
吞吐量极高极高中等
部署复杂度极低
持久化JetStream原生支持不支持
内存占用极低
适合场景微服务/IoT大数据企业集成简单通知

选型建议:轻量级微服务/IoT通信首选 NATS(最简单最快);需要消息回放和流处理选 Kafka;需要复杂路由选 RabbitMQ。


参考资料NATS 官网 | NATS 文档 | GitHub