跳转至

Apache Pulsar

一句话概述:Apache Pulsar 是下一代分布式消息和流处理平台,采用计算存储分离架构,原生支持多租户、跨地域复制和分层存储,适合超大规模、低延迟的消息处理场景,可视为 Kafka 的现代化替代方案。

核心知识点

概念白话解释
Topic主题 = 消息的逻辑通道(类似聊天室频道)
Producer/Consumer生产者/消费者 = 发送/接收消息的程序
Subscription订阅 = 消费者订阅主题的方式(4种模式)
Namespace命名空间 = 主题的逻辑分组(类似文件夹)
Tenant租户 = 多租户隔离(一个集群多个团队共用,互不干扰)
BookKeeper存储层 = 消息的持久化存储引擎(独立的存储集群)
Broker代理 = 无状态的消息路由服务(只负责路由,不存储)
BookieBookKeeper 节点 = 实际存储消息数据的节点
Pulsar Functions轻量级计算框架 = 无需额外框架直接在 Pulsar 内处理消息
Schema Registry消息格式注册中心 = 保证消息格式的一致性

白话解释:Pulsar vs Kafka

把 Kafka 比作一栋自建小楼:计算和存储都在同一台机器上,扩展时两者必须一起扩。

Pulsar 更像共享办公室 + 独立仓库: - Broker(共享办公室):只负责处理消息路由,可以独立扩展 - BookKeeper(独立仓库):只负责存储,可以独立扩展 - 这种分离让扩展更灵活,理论上存储容量无上限(配合分层存储到 S3)

安装配置

方式一:Docker 单机模式(最快上手)

# 拉取并启动 Pulsar standalone 模式(包含 broker + bookie + zookeeper)
docker run -d --name pulsar \
  -p 6650:6650 \   # 二进制协议端口(生产者/消费者使用)
  -p 8080:8080 \   # HTTP Admin API 端口
  apachepulsar/pulsar:latest \  # 最新版本
  bin/pulsar standalone          # standalone 模式,适合开发测试

# 验证启动成功
docker logs pulsar | tail -20    # 查看启动日志
curl http://localhost:8080/admin/v2/clusters  # 检查管理 API

方式二:Docker Compose(推荐开发环境)

# docker-compose.yml
version: '3.7'
services:
  pulsar:
    image: apachepulsar/pulsar:latest
    container_name: pulsar
    ports:
      - "6650:6650"   # Pulsar 消息协议
      - "8080:8080"   # Admin HTTP API
    command: bin/pulsar standalone
    environment:
      PULSAR_MEM: "-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m"  # 内存限制

  # Pulsar Manager(可视化管理界面,可选)
  pulsar-manager:
    image: apachepulsar/pulsar-manager:latest
    container_name: pulsar-manager
    ports:
      - "9527:9527"   # Web UI
      - "7750:7750"   # API
    environment:
      SPRING_CONFIGURATION_FILE: /pulsar-manager/pulsar-manager/application.properties
# 启动
docker-compose up -d

# 停止
docker-compose down

方式三:二进制安装

# 下载最新版本
wget https://archive.apache.org/dist/pulsar/pulsar-3.3.0/apache-pulsar-3.3.0-bin.tar.gz

# 解压
tar xvfz apache-pulsar-3.3.0-bin.tar.gz
cd apache-pulsar-3.3.0

# 启动 standalone 模式
bin/pulsar standalone

# 在另一个终端验证
bin/pulsar-admin clusters list       # 列出集群

Python 客户端安装

# 安装 Python 客户端(最新版本 3.11.0)
pip install pulsar-client

# 安装带额外组件
pip install 'pulsar-client[avro]'       # Avro 序列化支持
pip install 'pulsar-client[functions]'  # Functions 运行时支持
pip install 'pulsar-client[all]'        # 安装所有可选组件

# 验证安装
python3 -c "import pulsar; print('Pulsar client 安装成功')"

基本使用

生产者:发送消息

import pulsar

# 创建客户端连接
client = pulsar.Client(
    'pulsar://localhost:6650',  # Pulsar 服务地址
    # 如果启用了 TLS
    # 'pulsar+ssl://localhost:6651',
    operation_timeout_seconds=30,  # 操作超时时间
)

# 创建生产者
producer = client.create_producer(
    'persistent://public/default/my-topic',  # 主题地址格式:persistent://租户/命名空间/主题名
    # producer_name='my-producer',   # 可选:自定义生产者名称
    # send_timeout_millis=30000,     # 发送超时(毫秒)
    # compression_type=pulsar.CompressionType.LZ4,  # 压缩类型
    # max_pending_messages=1000,     # 最大等待消息数
)

# 同步发送消息(等待确认)
msg_id = producer.send(
    'Hello Pulsar!'.encode('utf-8'),  # 消息内容必须是 bytes
    # partition_key='user-123',       # 分区键(相同 key 路由到同一分区)
    # properties={'type': 'greeting', 'lang': 'zh'},  # 消息属性(元数据)
)
print(f"消息已发送,ID: {msg_id}")  # MessageId 格式:ledger:entry:partition

# 异步发送(非阻塞,高吞吐量场景)
def on_send_complete(res, msg_id):
    if res == pulsar.Result.Ok:
        print(f"异步发送成功: {msg_id}")
    else:
        print(f"发送失败: {res}")

producer.send_async(
    '异步消息'.encode('utf-8'),
    callback=on_send_complete       # 发送完成后的回调函数
)

# 批量发送(自动聚合多条消息)
producer.flush()                   # 等待所有异步消息发送完成

# 关闭资源(重要!)
producer.close()
client.close()

消费者:接收消息

import pulsar

client = pulsar.Client('pulsar://localhost:6650')  # 连接 Pulsar

# 创建消费者(订阅主题)
consumer = client.subscribe(
    'persistent://public/default/my-topic',  # 订阅的主题
    'my-subscription',                        # 订阅名称(相同名称=同一订阅组)
    # consumer_type=pulsar.ConsumerType.Shared,  # 订阅类型(默认 Exclusive)
    # message_listener=None,                     # 消息监听回调(用于异步消费)
    # receiver_queue_size=1000,                  # 接收队列大小
    # consumer_name='my-consumer',               # 消费者名称
)

# 接收消息(同步,阻塞等待)
while True:
    try:
        msg = consumer.receive(timeout_millis=5000)  # 等待 5 秒

        # 处理消息内容
        content = msg.data().decode('utf-8')  # 解码消息内容
        properties = msg.properties()          # 获取消息属性(元数据)
        print(f"收到消息: {content}")
        print(f"消息属性: {properties}")
        print(f"消息 ID: {msg.message_id()}")
        print(f"发布时间: {msg.publish_timestamp()}")

        # 确认消息已处理(非常重要!不 ack 则消息会重新投递)
        consumer.acknowledge(msg)

    except Exception as e:
        if 'timeout' in str(e).lower():
            print("等待超时,继续等待...")
            continue
        else:
            # 处理失败,让消息重新投递
            consumer.negative_acknowledge(msg)  # 告诉 Pulsar 这条消息处理失败
            print(f"处理失败: {e}")

consumer.close()
client.close()

主题名称格式

persistent://tenant/namespace/topic

# 解释:
# persistent://  = 持久化主题(消息写入 BookKeeper 磁盘)
# non-persistent://  = 非持久化主题(消息只在内存,速度快但可能丢失)
# tenant         = 租户名(如 public)
# namespace      = 命名空间(如 default)
# topic          = 主题名(如 my-topic)

# 示例:
# persistent://my-company/payments/orders
# persistent://public/default/logs
# non-persistent://public/default/metrics

高级用法

四种订阅类型

# 1. Exclusive(独占)- 只有一个消费者,默认类型
consumer = client.subscribe(topic, 'sub', consumer_type=pulsar.ConsumerType.Exclusive)

# 2. Shared(共享)- 多个消费者轮询接收,适合扩展处理能力
consumer = client.subscribe(topic, 'sub', consumer_type=pulsar.ConsumerType.Shared)

# 3. Failover(故障转移)- 主消费者故障时自动切换到备用,保证顺序
consumer = client.subscribe(topic, 'sub', consumer_type=pulsar.ConsumerType.Failover)

# 4. Key_Shared(按 Key 共享)- 相同 key 的消息总是由同一消费者处理
consumer = client.subscribe(topic, 'sub', consumer_type=pulsar.ConsumerType.KeyShared)

Schema 支持(消息格式校验)

from pulsar.schema import AvroSchema, JsonSchema, Record, String, Integer, Boolean

# 定义消息格式(类似 Protobuf)
class UserEvent(Record):
    user_id = Integer()      # 用户 ID(整数类型)
    username = String()      # 用户名(字符串)
    is_active = Boolean()    # 是否激活(布尔值)

# 使用 Avro Schema 创建生产者(自动格式校验)
producer = client.create_producer(
    'persistent://public/default/user-events',
    schema=AvroSchema(UserEvent)  # 绑定 Schema
)

# 发送结构化消息(会自动序列化为 Avro)
producer.send(UserEvent(user_id=1001, username='张三', is_active=True))

# 使用 Schema 创建消费者(自动反序列化)
consumer = client.subscribe(
    'persistent://public/default/user-events',
    'user-processor',
    schema=AvroSchema(UserEvent)
)
msg = consumer.receive()
user = msg.value()        # 直接得到 UserEvent 对象
print(f"用户: {user.username}, ID: {user.user_id}")

消息读取器(Reader):从指定位置读取

# Reader 与 Consumer 的区别:
# Consumer:记录消费进度,支持多种订阅模式
# Reader:不记录进度,可以任意指定起始位置读取,适合回放历史消息

reader = client.create_reader(
    'persistent://public/default/my-topic',
    pulsar.MessageId.earliest,         # 从最早的消息开始读
    # pulsar.MessageId.latest,          # 从最新的消息开始读
    # specific_msg_id,                  # 从指定 MessageId 开始读
)

while reader.has_message_available():   # 检查是否还有消息
    msg = reader.read_next()
    print(f"读取: {msg.data().decode()}")

reader.close()

Pulsar Admin API:管理集群

# 使用 HTTP Admin API 管理 Pulsar
import requests

BASE_URL = "http://localhost:8080/admin/v2"

# 创建命名空间
requests.put(f"{BASE_URL}/namespaces/public/my-namespace")

# 获取主题列表
response = requests.get(f"{BASE_URL}/persistent/public/default")
print(response.json())  # 列出所有主题

# 获取主题统计信息
response = requests.get(
    f"{BASE_URL}/persistent/public/default/my-topic/stats"
)
stats = response.json()
print(f"消息总数: {stats['msgInCounter']}")
print(f"字节总数: {stats['bytesInCounter']}")

Pulsar Functions:流处理

# Pulsar Functions 是 Pulsar 内置的轻量级流处理框架
# 无需 Spark/Flink,直接在 Pulsar 内部处理消息

# 创建一个简单的 Function:将消息转为大写
# 文件:uppercase_function.py

def process(input, context):
    """Pulsar Function 处理函数"""
    # input: 输入消息内容
    # context: 上下文对象(包含日志、计数器等)

    context.get_logger().info(f"处理消息: {input}")    # 记录日志

    result = input.upper()  # 将消息转为大写
    return result            # 返回值会被自动发送到输出主题
# 部署 Function
bin/pulsar-admin functions create \
  --py uppercase_function.py \    # Python 文件
  --classname uppercase_function \  # 函数名
  --inputs persistent://public/default/input-topic \  # 输入主题
  --output persistent://public/default/output-topic \ # 输出主题
  --name uppercase-function        # Function 名称

# 查看 Function 状态
bin/pulsar-admin functions status --name uppercase-function

# 查看 Function 统计
bin/pulsar-admin functions stats --name uppercase-function

常见报错

报错信息原因解决方法
Connection refused 6650Pulsar 服务未启动docker start pulsar 或检查服务
Topic not found主题不存在生产者发消息时会自动创建,无需手动创建
Consumer is already subscribed独占订阅已被其他消费者占用使用 Shared 订阅,或换不同订阅名
Schema incompatible新 Schema 与已有 Schema 不兼容检查 Schema 字段类型,或使用兼容性策略
Message too large消息超过大小限制(默认 5MB)在 broker.conf 调大 maxMessageSize
Producer timeout发送超时,Broker 过载增加超时时间,或扩展 Broker
Not enough space on bookieBookKeeper 磁盘满清理数据或扩展存储
Authentication failed认证配置错误检查 token 或证书配置
Backlog quota exceeded积压配额超限增加消费速度或调整 backlog 策略

速查表

# ===== Pulsar Admin CLI =====

# 主题管理
pulsar-admin topics list public/default          # 列出所有主题
pulsar-admin topics stats <topic>                # 主题统计信息
pulsar-admin topics create <topic>               # 手动创建主题
pulsar-admin topics delete <topic>               # 删除主题
pulsar-admin topics peek-messages <topic> -n 10  # 查看最新 10 条消息

# 订阅管理
pulsar-admin topics subscriptions <topic>        # 列出订阅
pulsar-admin topics unsubscribe <topic> -s <sub> # 删除订阅

# 命名空间管理
pulsar-admin namespaces list public              # 列出命名空间
pulsar-admin namespaces create public/my-ns      # 创建命名空间
pulsar-admin namespaces policies public/default  # 查看命名空间策略

# 集群信息
pulsar-admin clusters list                       # 列出集群
pulsar-admin brokers list my-cluster             # 列出 Broker

# ===== 命令行测试 =====

# 发送测试消息
pulsar-client produce persistent://public/default/test-topic \
  -m "Hello" --num-produce 10   # 发送 10 条 Hello 消息

# 消费测试消息
pulsar-client consume persistent://public/default/test-topic \
  -s "test-sub" -n 10           # 消费 10 条消息

# ===== 四种订阅类型速查 =====
# Exclusive  : 独占,只允许一个消费者(有序保证)
# Shared     : 共享,多消费者轮询,提高吞吐
# Failover   : 主备,主挂了自动切换(有序保证)
# Key_Shared : 按 Key 分片,相同 key 有序,不同 key 并行

# ===== 主题类型 =====
# persistent://    消息持久化到 BookKeeper 磁盘(生产环境用)
# non-persistent:// 消息只在内存中(高速但可能丢失)

同类工具对比

特性Apache PulsarApache KafkaRabbitMQ
架构计算存储分离一体化一体化
多租户原生支持需要额外配置有限支持
跨地域复制原生支持MirrorMakerShovel 插件
消息回放任意位置按 offset不支持
分层存储原生支持(S3等)不支持不支持
延迟消息原生支持不支持插件支持
流处理Pulsar FunctionsKafka Streams不支持
社区生态中等非常大
学习曲线较陡中等
适用场景超大规模、多租户日志流处理任务队列

选型建议: - 如果已经用 Kafka 且规模不大,继续用 Kafka - 如果需要多租户隔离、超大规模或分层存储,选 Pulsar - 如果只是简单任务队列,RabbitMQ 更简单


参考:Apache Pulsar 官方文档 | Python 客户端文档 | PyPI pulsar-client | 更新于 2026 年