跳转至

Apache Kafka 消息队列

一句话概述:Apache Kafka 是分布式事件流平台,像一个超大容量的"消息传送带",让不同系统之间高效可靠地传递数据。

核心知识点表

概念白话解释
BrokerKafka服务器节点,多个Broker组成集群
Topic消息的分类,比如"订单""日志"是不同的Topic
PartitionTopic的分区,数据分散到多个分区实现并行
Producer生产者,往Kafka里发消息的程序
Consumer消费者,从Kafka里读消息的程序
Consumer Group消费者组,组内成员分工消费不同分区
Offset偏移量,每条消息的序号,消费者靠它记住读到哪里了
KRaftKafka 4.0开始取代ZooKeeper的自管理协议
Share Group4.0新增的"队列模式",消息可以被组内任意消费者处理

版本信息(2026年5月)

  • 最新版本:Kafka 4.2.0
  • 重大变化:4.0起完全移除ZooKeeper,默认KRaft模式
  • 新特性:Share Group队列语义、新一代消费者再平衡协议

安装配置

方式一:本地安装

# 前提:安装Java 17+
sudo apt install openjdk-17-jdk -y  # Ubuntu安装Java
java -version  # 验证Java版本

# 下载Kafka
wget https://downloads.apache.org/kafka/4.2.0/kafka_2.13-4.2.0.tgz
tar -xzf kafka_2.13-4.2.0.tgz  # 解压
cd kafka_2.13-4.2.0  # 进入目录

# 生成集群ID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"  # 生成随机UUID

# 格式化存储目录(KRaft模式,不再需要ZooKeeper)
bin/kafka-storage.sh format --standalone \
    -t $KAFKA_CLUSTER_ID \
    -c config/server.properties  # 初始化存储

# 启动Kafka
bin/kafka-server-start.sh config/server.properties  # 前台启动
# 或后台启动
bin/kafka-server-start.sh -daemon config/server.properties

方式二:Docker

# 使用官方Docker镜像
docker run -d \
    --name kafka \
    -p 9092:9092 \
    apache/kafka:4.2.0

# 进入容器使用Kafka命令
docker exec -it kafka bash

Python客户端安装

pip install confluent-kafka  # 推荐,性能最好
# 或
pip install kafka-python     # 纯Python实现,简单易用

基本使用

命令行操作

# 创建Topic
bin/kafka-topics.sh --create \
    --topic my-topic \
    --bootstrap-server localhost:9092 \
    --partitions 3 \
    --replication-factor 1  # 单节点设为1

# 列出所有Topic
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看Topic详情
bin/kafka-topics.sh --describe \
    --topic my-topic \
    --bootstrap-server localhost:9092

# 命令行生产消息
bin/kafka-console-producer.sh \
    --topic my-topic \
    --bootstrap-server localhost:9092
# 然后输入消息,每行一条,Ctrl+C退出

# 命令行消费消息
bin/kafka-console-consumer.sh \
    --topic my-topic \
    --bootstrap-server localhost:9092 \
    --from-beginning  # 从头开始消费

Python生产者

# producer.py
from confluent_kafka import Producer  # 导入生产者
import json

# 配置生产者
config = {
    'bootstrap.servers': 'localhost:9092',  # Kafka地址
    'client.id': 'my-producer',             # 客户端标识
}
producer = Producer(config)  # 创建生产者实例

# 回调函数:消息发送后调用
def delivery_callback(err, msg):
    if err:
        print(f'消息发送失败: {err}')  # 发送失败
    else:
        print(f'消息已发送到 {msg.topic()} 分区{msg.partition()} 偏移{msg.offset()}')

# 发送消息
for i in range(10):
    data = {
        "user_id": i,
        "action": "login",
        "timestamp": "2026-05-13T10:00:00",
    }
    producer.produce(
        topic='user-events',              # 发到哪个Topic
        key=str(i).encode('utf-8'),       # 消息键(决定发到哪个分区)
        value=json.dumps(data).encode('utf-8'),  # 消息值(JSON序列化)
        callback=delivery_callback,        # 发送回调
    )

producer.flush()  # 等待所有消息发送完毕
print("所有消息发送完成")

Python消费者

# consumer.py
from confluent_kafka import Consumer  # 导入消费者
import json

# 配置消费者
config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-consumer-group',  # 消费者组ID
    'auto.offset.reset': 'earliest',  # 从最早的消息开始消费
    'enable.auto.commit': True,       # 自动提交偏移量
}
consumer = Consumer(config)  # 创建消费者

# 订阅Topic
consumer.subscribe(['user-events'])  # 订阅一个或多个Topic

# 消费消息循环
try:
    while True:
        msg = consumer.poll(timeout=1.0)  # 拉取消息,等待最多1秒
        if msg is None:
            continue  # 没有新消息
        if msg.error():
            print(f"消费错误: {msg.error()}")
            continue

        # 解析消息
        key = msg.key().decode('utf-8')  # 消息键
        value = json.loads(msg.value().decode('utf-8'))  # 消息值
        print(f"收到消息 - 键:{key}, 值:{value}, "
              f"分区:{msg.partition()}, 偏移:{msg.offset()}")

except KeyboardInterrupt:
    print("停止消费")
finally:
    consumer.close()  # 关闭消费者

高级用法

精确一次语义(Exactly Once)

# 事务生产者:保证消息要么全部发送成功,要么全部不发
config = {
    'bootstrap.servers': 'localhost:9092',
    'transactional.id': 'my-transactional-producer',  # 事务ID
}
producer = Producer(config)
producer.init_transactions()  # 初始化事务

try:
    producer.begin_transaction()  # 开始事务
    producer.produce('topic1', value=b'msg1')
    producer.produce('topic2', value=b'msg2')
    producer.commit_transaction()  # 提交事务(两条消息一起成功)
except Exception as e:
    producer.abort_transaction()  # 回滚事务(两条消息都不发)
    print(f"事务失败: {e}")

手动提交偏移量

# 关闭自动提交,手动控制
config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'manual-commit-group',
    'enable.auto.commit': False,  # 关闭自动提交
}
consumer = Consumer(config)
consumer.subscribe(['events'])

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        continue

    # 处理消息
    process_message(msg)

    # 处理成功后才提交偏移量(保证不丢消息)
    consumer.commit(asynchronous=False)  # 同步提交

多分区并行消费

# Kafka自动把分区分配给同一组内的消费者
# 3个分区 + 3个消费者 = 每个消费者处理1个分区

# consumer_1.py, consumer_2.py, consumer_3.py 代码完全相同
# 只要 group.id 相同,Kafka自动负载均衡
config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'parallel-group',  # 相同的group.id
}

常见报错与解决

报错信息原因解决方案
Connection refusedKafka没启动检查Kafka进程和端口9092
Topic not foundTopic不存在创建Topic或开启auto.create.topics
LEADER_NOT_AVAILABLE分区Leader还没选出来等几秒重试,或检查Broker状态
Group coordinator not available消费者组协调器问题等待Kafka完全启动
Message too large单条消息太大调大message.max.bytes,或拆分消息
Consumer rebalance消费者加入/离开触发再平衡正常现象,优化max.poll.interval.ms

速查表

# ===== Topic管理 =====
kafka-topics.sh --create --topic T --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092
kafka-topics.sh --list --bootstrap-server localhost:9092
kafka-topics.sh --describe --topic T --bootstrap-server localhost:9092
kafka-topics.sh --delete --topic T --bootstrap-server localhost:9092

# ===== 消息收发 =====
kafka-console-producer.sh --topic T --bootstrap-server localhost:9092
kafka-console-consumer.sh --topic T --from-beginning --bootstrap-server localhost:9092

# ===== 消费者组 =====
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
kafka-consumer-groups.sh --describe --group G --bootstrap-server localhost:9092

# ===== 关键配置 =====
# bootstrap.servers    Kafka地址
# group.id             消费者组ID
# auto.offset.reset    earliest/latest
# acks                 0/1/all(消息可靠性级别)
# retries              重试次数
# batch.size           批量大小

同类工具对比

特性KafkaRabbitMQRedis StreamsPulsar
吞吐量极高(百万/秒)中等极高
消息持久化磁盘持久化内存+磁盘内存+持久化磁盘持久化
消息回放支持(核心优势)不支持有限支持支持
延迟毫秒级微秒级微秒级毫秒级
适合场景大数据管道、事件流任务队列、RPC轻量级消息多租户流处理

面试建议:Kafka面试高频。重点理解:1)为什么Kafka快(顺序写磁盘+零拷贝+批量发送);2)Consumer Group如何实现负载均衡;3)如何保证消息不丢失(acks=all + 手动提交offset);4)Kafka 4.0为什么去掉ZooKeeper。