Apache Pulsar¶
一句话概述:Apache Pulsar 是下一代分布式消息和流处理平台,采用计算存储分离架构,原生支持多租户、跨地域复制和分层存储,适合超大规模、低延迟的消息处理场景,可视为 Kafka 的现代化替代方案。
核心知识点¶
| 概念 | 白话解释 |
|---|---|
| Topic | 主题 = 消息的逻辑通道(类似聊天室频道) |
| Producer/Consumer | 生产者/消费者 = 发送/接收消息的程序 |
| Subscription | 订阅 = 消费者订阅主题的方式(4种模式) |
| Namespace | 命名空间 = 主题的逻辑分组(类似文件夹) |
| Tenant | 租户 = 多租户隔离(一个集群多个团队共用,互不干扰) |
| BookKeeper | 存储层 = 消息的持久化存储引擎(独立的存储集群) |
| Broker | 代理 = 无状态的消息路由服务(只负责路由,不存储) |
| Bookie | BookKeeper 节点 = 实际存储消息数据的节点 |
| Pulsar Functions | 轻量级计算框架 = 无需额外框架直接在 Pulsar 内处理消息 |
| Schema Registry | 消息格式注册中心 = 保证消息格式的一致性 |
白话解释:Pulsar vs Kafka¶
把 Kafka 比作一栋自建小楼:计算和存储都在同一台机器上,扩展时两者必须一起扩。
Pulsar 更像共享办公室 + 独立仓库: - Broker(共享办公室):只负责处理消息路由,可以独立扩展 - BookKeeper(独立仓库):只负责存储,可以独立扩展 - 这种分离让扩展更灵活,理论上存储容量无上限(配合分层存储到 S3)
安装配置¶
方式一:Docker 单机模式(最快上手)¶
# 拉取并启动 Pulsar standalone 模式(包含 broker + bookie + zookeeper)
docker run -d --name pulsar \
-p 6650:6650 \ # 二进制协议端口(生产者/消费者使用)
-p 8080:8080 \ # HTTP Admin API 端口
apachepulsar/pulsar:latest \ # 最新版本
bin/pulsar standalone # standalone 模式,适合开发测试
# 验证启动成功
docker logs pulsar | tail -20 # 查看启动日志
curl http://localhost:8080/admin/v2/clusters # 检查管理 API
方式二:Docker Compose(推荐开发环境)¶
# docker-compose.yml
version: '3.7'
services:
pulsar:
image: apachepulsar/pulsar:latest
container_name: pulsar
ports:
- "6650:6650" # Pulsar 消息协议
- "8080:8080" # Admin HTTP API
command: bin/pulsar standalone
environment:
PULSAR_MEM: "-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m" # 内存限制
# Pulsar Manager(可视化管理界面,可选)
pulsar-manager:
image: apachepulsar/pulsar-manager:latest
container_name: pulsar-manager
ports:
- "9527:9527" # Web UI
- "7750:7750" # API
environment:
SPRING_CONFIGURATION_FILE: /pulsar-manager/pulsar-manager/application.properties
方式三:二进制安装¶
# 下载最新版本
wget https://archive.apache.org/dist/pulsar/pulsar-3.3.0/apache-pulsar-3.3.0-bin.tar.gz
# 解压
tar xvfz apache-pulsar-3.3.0-bin.tar.gz
cd apache-pulsar-3.3.0
# 启动 standalone 模式
bin/pulsar standalone
# 在另一个终端验证
bin/pulsar-admin clusters list # 列出集群
Python 客户端安装¶
# 安装 Python 客户端(最新版本 3.11.0)
pip install pulsar-client
# 安装带额外组件
pip install 'pulsar-client[avro]' # Avro 序列化支持
pip install 'pulsar-client[functions]' # Functions 运行时支持
pip install 'pulsar-client[all]' # 安装所有可选组件
# 验证安装
python3 -c "import pulsar; print('Pulsar client 安装成功')"
基本使用¶
生产者:发送消息¶
import pulsar
# 创建客户端连接
client = pulsar.Client(
'pulsar://localhost:6650', # Pulsar 服务地址
# 如果启用了 TLS
# 'pulsar+ssl://localhost:6651',
operation_timeout_seconds=30, # 操作超时时间
)
# 创建生产者
producer = client.create_producer(
'persistent://public/default/my-topic', # 主题地址格式:persistent://租户/命名空间/主题名
# producer_name='my-producer', # 可选:自定义生产者名称
# send_timeout_millis=30000, # 发送超时(毫秒)
# compression_type=pulsar.CompressionType.LZ4, # 压缩类型
# max_pending_messages=1000, # 最大等待消息数
)
# 同步发送消息(等待确认)
msg_id = producer.send(
'Hello Pulsar!'.encode('utf-8'), # 消息内容必须是 bytes
# partition_key='user-123', # 分区键(相同 key 路由到同一分区)
# properties={'type': 'greeting', 'lang': 'zh'}, # 消息属性(元数据)
)
print(f"消息已发送,ID: {msg_id}") # MessageId 格式:ledger:entry:partition
# 异步发送(非阻塞,高吞吐量场景)
def on_send_complete(res, msg_id):
if res == pulsar.Result.Ok:
print(f"异步发送成功: {msg_id}")
else:
print(f"发送失败: {res}")
producer.send_async(
'异步消息'.encode('utf-8'),
callback=on_send_complete # 发送完成后的回调函数
)
# 批量发送(自动聚合多条消息)
producer.flush() # 等待所有异步消息发送完成
# 关闭资源(重要!)
producer.close()
client.close()
消费者:接收消息¶
import pulsar
client = pulsar.Client('pulsar://localhost:6650') # 连接 Pulsar
# 创建消费者(订阅主题)
consumer = client.subscribe(
'persistent://public/default/my-topic', # 订阅的主题
'my-subscription', # 订阅名称(相同名称=同一订阅组)
# consumer_type=pulsar.ConsumerType.Shared, # 订阅类型(默认 Exclusive)
# message_listener=None, # 消息监听回调(用于异步消费)
# receiver_queue_size=1000, # 接收队列大小
# consumer_name='my-consumer', # 消费者名称
)
# 接收消息(同步,阻塞等待)
while True:
try:
msg = consumer.receive(timeout_millis=5000) # 等待 5 秒
# 处理消息内容
content = msg.data().decode('utf-8') # 解码消息内容
properties = msg.properties() # 获取消息属性(元数据)
print(f"收到消息: {content}")
print(f"消息属性: {properties}")
print(f"消息 ID: {msg.message_id()}")
print(f"发布时间: {msg.publish_timestamp()}")
# 确认消息已处理(非常重要!不 ack 则消息会重新投递)
consumer.acknowledge(msg)
except Exception as e:
if 'timeout' in str(e).lower():
print("等待超时,继续等待...")
continue
else:
# 处理失败,让消息重新投递
consumer.negative_acknowledge(msg) # 告诉 Pulsar 这条消息处理失败
print(f"处理失败: {e}")
consumer.close()
client.close()
主题名称格式¶
persistent://tenant/namespace/topic
# 解释:
# persistent:// = 持久化主题(消息写入 BookKeeper 磁盘)
# non-persistent:// = 非持久化主题(消息只在内存,速度快但可能丢失)
# tenant = 租户名(如 public)
# namespace = 命名空间(如 default)
# topic = 主题名(如 my-topic)
# 示例:
# persistent://my-company/payments/orders
# persistent://public/default/logs
# non-persistent://public/default/metrics
高级用法¶
四种订阅类型¶
# 1. Exclusive(独占)- 只有一个消费者,默认类型
consumer = client.subscribe(topic, 'sub', consumer_type=pulsar.ConsumerType.Exclusive)
# 2. Shared(共享)- 多个消费者轮询接收,适合扩展处理能力
consumer = client.subscribe(topic, 'sub', consumer_type=pulsar.ConsumerType.Shared)
# 3. Failover(故障转移)- 主消费者故障时自动切换到备用,保证顺序
consumer = client.subscribe(topic, 'sub', consumer_type=pulsar.ConsumerType.Failover)
# 4. Key_Shared(按 Key 共享)- 相同 key 的消息总是由同一消费者处理
consumer = client.subscribe(topic, 'sub', consumer_type=pulsar.ConsumerType.KeyShared)
Schema 支持(消息格式校验)¶
from pulsar.schema import AvroSchema, JsonSchema, Record, String, Integer, Boolean
# 定义消息格式(类似 Protobuf)
class UserEvent(Record):
user_id = Integer() # 用户 ID(整数类型)
username = String() # 用户名(字符串)
is_active = Boolean() # 是否激活(布尔值)
# 使用 Avro Schema 创建生产者(自动格式校验)
producer = client.create_producer(
'persistent://public/default/user-events',
schema=AvroSchema(UserEvent) # 绑定 Schema
)
# 发送结构化消息(会自动序列化为 Avro)
producer.send(UserEvent(user_id=1001, username='张三', is_active=True))
# 使用 Schema 创建消费者(自动反序列化)
consumer = client.subscribe(
'persistent://public/default/user-events',
'user-processor',
schema=AvroSchema(UserEvent)
)
msg = consumer.receive()
user = msg.value() # 直接得到 UserEvent 对象
print(f"用户: {user.username}, ID: {user.user_id}")
消息读取器(Reader):从指定位置读取¶
# Reader 与 Consumer 的区别:
# Consumer:记录消费进度,支持多种订阅模式
# Reader:不记录进度,可以任意指定起始位置读取,适合回放历史消息
reader = client.create_reader(
'persistent://public/default/my-topic',
pulsar.MessageId.earliest, # 从最早的消息开始读
# pulsar.MessageId.latest, # 从最新的消息开始读
# specific_msg_id, # 从指定 MessageId 开始读
)
while reader.has_message_available(): # 检查是否还有消息
msg = reader.read_next()
print(f"读取: {msg.data().decode()}")
reader.close()
Pulsar Admin API:管理集群¶
# 使用 HTTP Admin API 管理 Pulsar
import requests
BASE_URL = "http://localhost:8080/admin/v2"
# 创建命名空间
requests.put(f"{BASE_URL}/namespaces/public/my-namespace")
# 获取主题列表
response = requests.get(f"{BASE_URL}/persistent/public/default")
print(response.json()) # 列出所有主题
# 获取主题统计信息
response = requests.get(
f"{BASE_URL}/persistent/public/default/my-topic/stats"
)
stats = response.json()
print(f"消息总数: {stats['msgInCounter']}")
print(f"字节总数: {stats['bytesInCounter']}")
Pulsar Functions:流处理¶
# Pulsar Functions 是 Pulsar 内置的轻量级流处理框架
# 无需 Spark/Flink,直接在 Pulsar 内部处理消息
# 创建一个简单的 Function:将消息转为大写
# 文件:uppercase_function.py
def process(input, context):
"""Pulsar Function 处理函数"""
# input: 输入消息内容
# context: 上下文对象(包含日志、计数器等)
context.get_logger().info(f"处理消息: {input}") # 记录日志
result = input.upper() # 将消息转为大写
return result # 返回值会被自动发送到输出主题
# 部署 Function
bin/pulsar-admin functions create \
--py uppercase_function.py \ # Python 文件
--classname uppercase_function \ # 函数名
--inputs persistent://public/default/input-topic \ # 输入主题
--output persistent://public/default/output-topic \ # 输出主题
--name uppercase-function # Function 名称
# 查看 Function 状态
bin/pulsar-admin functions status --name uppercase-function
# 查看 Function 统计
bin/pulsar-admin functions stats --name uppercase-function
常见报错¶
| 报错信息 | 原因 | 解决方法 |
|---|---|---|
Connection refused 6650 | Pulsar 服务未启动 | docker start pulsar 或检查服务 |
Topic not found | 主题不存在 | 生产者发消息时会自动创建,无需手动创建 |
Consumer is already subscribed | 独占订阅已被其他消费者占用 | 使用 Shared 订阅,或换不同订阅名 |
Schema incompatible | 新 Schema 与已有 Schema 不兼容 | 检查 Schema 字段类型,或使用兼容性策略 |
Message too large | 消息超过大小限制(默认 5MB) | 在 broker.conf 调大 maxMessageSize |
Producer timeout | 发送超时,Broker 过载 | 增加超时时间,或扩展 Broker |
Not enough space on bookie | BookKeeper 磁盘满 | 清理数据或扩展存储 |
Authentication failed | 认证配置错误 | 检查 token 或证书配置 |
Backlog quota exceeded | 积压配额超限 | 增加消费速度或调整 backlog 策略 |
速查表¶
# ===== Pulsar Admin CLI =====
# 主题管理
pulsar-admin topics list public/default # 列出所有主题
pulsar-admin topics stats <topic> # 主题统计信息
pulsar-admin topics create <topic> # 手动创建主题
pulsar-admin topics delete <topic> # 删除主题
pulsar-admin topics peek-messages <topic> -n 10 # 查看最新 10 条消息
# 订阅管理
pulsar-admin topics subscriptions <topic> # 列出订阅
pulsar-admin topics unsubscribe <topic> -s <sub> # 删除订阅
# 命名空间管理
pulsar-admin namespaces list public # 列出命名空间
pulsar-admin namespaces create public/my-ns # 创建命名空间
pulsar-admin namespaces policies public/default # 查看命名空间策略
# 集群信息
pulsar-admin clusters list # 列出集群
pulsar-admin brokers list my-cluster # 列出 Broker
# ===== 命令行测试 =====
# 发送测试消息
pulsar-client produce persistent://public/default/test-topic \
-m "Hello" --num-produce 10 # 发送 10 条 Hello 消息
# 消费测试消息
pulsar-client consume persistent://public/default/test-topic \
-s "test-sub" -n 10 # 消费 10 条消息
# ===== 四种订阅类型速查 =====
# Exclusive : 独占,只允许一个消费者(有序保证)
# Shared : 共享,多消费者轮询,提高吞吐
# Failover : 主备,主挂了自动切换(有序保证)
# Key_Shared : 按 Key 分片,相同 key 有序,不同 key 并行
# ===== 主题类型 =====
# persistent:// 消息持久化到 BookKeeper 磁盘(生产环境用)
# non-persistent:// 消息只在内存中(高速但可能丢失)
同类工具对比¶
| 特性 | Apache Pulsar | Apache Kafka | RabbitMQ |
|---|---|---|---|
| 架构 | 计算存储分离 | 一体化 | 一体化 |
| 多租户 | 原生支持 | 需要额外配置 | 有限支持 |
| 跨地域复制 | 原生支持 | MirrorMaker | Shovel 插件 |
| 消息回放 | 任意位置 | 按 offset | 不支持 |
| 分层存储 | 原生支持(S3等) | 不支持 | 不支持 |
| 延迟消息 | 原生支持 | 不支持 | 插件支持 |
| 流处理 | Pulsar Functions | Kafka Streams | 不支持 |
| 社区生态 | 中等 | 非常大 | 大 |
| 学习曲线 | 较陡 | 中等 | 低 |
| 适用场景 | 超大规模、多租户 | 日志流处理 | 任务队列 |
选型建议: - 如果已经用 Kafka 且规模不大,继续用 Kafka - 如果需要多租户隔离、超大规模或分层存储,选 Pulsar - 如果只是简单任务队列,RabbitMQ 更简单
参考:Apache Pulsar 官方文档 | Python 客户端文档 | PyPI pulsar-client | 更新于 2026 年