Apache Kafka 消息队列
一句话概述:Apache Kafka 是分布式事件流平台,像一个超大容量的"消息传送带",让不同系统之间高效可靠地传递数据。
核心知识点表
| 概念 | 白话解释 |
|---|
| Broker | Kafka服务器节点,多个Broker组成集群 |
| Topic | 消息的分类,比如"订单""日志"是不同的Topic |
| Partition | Topic的分区,数据分散到多个分区实现并行 |
| Producer | 生产者,往Kafka里发消息的程序 |
| Consumer | 消费者,从Kafka里读消息的程序 |
| Consumer Group | 消费者组,组内成员分工消费不同分区 |
| Offset | 偏移量,每条消息的序号,消费者靠它记住读到哪里了 |
| KRaft | Kafka 4.0开始取代ZooKeeper的自管理协议 |
| Share Group | 4.0新增的"队列模式",消息可以被组内任意消费者处理 |
版本信息(2026年5月)
- 最新版本:Kafka 4.2.0
- 重大变化:4.0起完全移除ZooKeeper,默认KRaft模式
- 新特性:Share Group队列语义、新一代消费者再平衡协议
安装配置
方式一:本地安装
# 前提:安装Java 17+
sudo apt install openjdk-17-jdk -y # Ubuntu安装Java
java -version # 验证Java版本
# 下载Kafka
wget https://downloads.apache.org/kafka/4.2.0/kafka_2.13-4.2.0.tgz
tar -xzf kafka_2.13-4.2.0.tgz # 解压
cd kafka_2.13-4.2.0 # 进入目录
# 生成集群ID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" # 生成随机UUID
# 格式化存储目录(KRaft模式,不再需要ZooKeeper)
bin/kafka-storage.sh format --standalone \
-t $KAFKA_CLUSTER_ID \
-c config/server.properties # 初始化存储
# 启动Kafka
bin/kafka-server-start.sh config/server.properties # 前台启动
# 或后台启动
bin/kafka-server-start.sh -daemon config/server.properties
方式二:Docker
# 使用官方Docker镜像
docker run -d \
--name kafka \
-p 9092:9092 \
apache/kafka:4.2.0
# 进入容器使用Kafka命令
docker exec -it kafka bash
Python客户端安装
pip install confluent-kafka # 推荐,性能最好
# 或
pip install kafka-python # 纯Python实现,简单易用
基本使用
命令行操作
# 创建Topic
bin/kafka-topics.sh --create \
--topic my-topic \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1 # 单节点设为1
# 列出所有Topic
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看Topic详情
bin/kafka-topics.sh --describe \
--topic my-topic \
--bootstrap-server localhost:9092
# 命令行生产消息
bin/kafka-console-producer.sh \
--topic my-topic \
--bootstrap-server localhost:9092
# 然后输入消息,每行一条,Ctrl+C退出
# 命令行消费消息
bin/kafka-console-consumer.sh \
--topic my-topic \
--bootstrap-server localhost:9092 \
--from-beginning # 从头开始消费
Python生产者
# producer.py
from confluent_kafka import Producer # 导入生产者
import json
# 配置生产者
config = {
'bootstrap.servers': 'localhost:9092', # Kafka地址
'client.id': 'my-producer', # 客户端标识
}
producer = Producer(config) # 创建生产者实例
# 回调函数:消息发送后调用
def delivery_callback(err, msg):
if err:
print(f'消息发送失败: {err}') # 发送失败
else:
print(f'消息已发送到 {msg.topic()} 分区{msg.partition()} 偏移{msg.offset()}')
# 发送消息
for i in range(10):
data = {
"user_id": i,
"action": "login",
"timestamp": "2026-05-13T10:00:00",
}
producer.produce(
topic='user-events', # 发到哪个Topic
key=str(i).encode('utf-8'), # 消息键(决定发到哪个分区)
value=json.dumps(data).encode('utf-8'), # 消息值(JSON序列化)
callback=delivery_callback, # 发送回调
)
producer.flush() # 等待所有消息发送完毕
print("所有消息发送完成")
Python消费者
# consumer.py
from confluent_kafka import Consumer # 导入消费者
import json
# 配置消费者
config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-consumer-group', # 消费者组ID
'auto.offset.reset': 'earliest', # 从最早的消息开始消费
'enable.auto.commit': True, # 自动提交偏移量
}
consumer = Consumer(config) # 创建消费者
# 订阅Topic
consumer.subscribe(['user-events']) # 订阅一个或多个Topic
# 消费消息循环
try:
while True:
msg = consumer.poll(timeout=1.0) # 拉取消息,等待最多1秒
if msg is None:
continue # 没有新消息
if msg.error():
print(f"消费错误: {msg.error()}")
continue
# 解析消息
key = msg.key().decode('utf-8') # 消息键
value = json.loads(msg.value().decode('utf-8')) # 消息值
print(f"收到消息 - 键:{key}, 值:{value}, "
f"分区:{msg.partition()}, 偏移:{msg.offset()}")
except KeyboardInterrupt:
print("停止消费")
finally:
consumer.close() # 关闭消费者
高级用法
精确一次语义(Exactly Once)
# 事务生产者:保证消息要么全部发送成功,要么全部不发
config = {
'bootstrap.servers': 'localhost:9092',
'transactional.id': 'my-transactional-producer', # 事务ID
}
producer = Producer(config)
producer.init_transactions() # 初始化事务
try:
producer.begin_transaction() # 开始事务
producer.produce('topic1', value=b'msg1')
producer.produce('topic2', value=b'msg2')
producer.commit_transaction() # 提交事务(两条消息一起成功)
except Exception as e:
producer.abort_transaction() # 回滚事务(两条消息都不发)
print(f"事务失败: {e}")
手动提交偏移量
# 关闭自动提交,手动控制
config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'manual-commit-group',
'enable.auto.commit': False, # 关闭自动提交
}
consumer = Consumer(config)
consumer.subscribe(['events'])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
# 处理消息
process_message(msg)
# 处理成功后才提交偏移量(保证不丢消息)
consumer.commit(asynchronous=False) # 同步提交
多分区并行消费
# Kafka自动把分区分配给同一组内的消费者
# 3个分区 + 3个消费者 = 每个消费者处理1个分区
# consumer_1.py, consumer_2.py, consumer_3.py 代码完全相同
# 只要 group.id 相同,Kafka自动负载均衡
config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'parallel-group', # 相同的group.id
}
常见报错与解决
| 报错信息 | 原因 | 解决方案 |
|---|
Connection refused | Kafka没启动 | 检查Kafka进程和端口9092 |
Topic not found | Topic不存在 | 创建Topic或开启auto.create.topics |
LEADER_NOT_AVAILABLE | 分区Leader还没选出来 | 等几秒重试,或检查Broker状态 |
Group coordinator not available | 消费者组协调器问题 | 等待Kafka完全启动 |
Message too large | 单条消息太大 | 调大message.max.bytes,或拆分消息 |
Consumer rebalance | 消费者加入/离开触发再平衡 | 正常现象,优化max.poll.interval.ms |
速查表
# ===== Topic管理 =====
kafka-topics.sh --create --topic T --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092
kafka-topics.sh --list --bootstrap-server localhost:9092
kafka-topics.sh --describe --topic T --bootstrap-server localhost:9092
kafka-topics.sh --delete --topic T --bootstrap-server localhost:9092
# ===== 消息收发 =====
kafka-console-producer.sh --topic T --bootstrap-server localhost:9092
kafka-console-consumer.sh --topic T --from-beginning --bootstrap-server localhost:9092
# ===== 消费者组 =====
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
kafka-consumer-groups.sh --describe --group G --bootstrap-server localhost:9092
# ===== 关键配置 =====
# bootstrap.servers Kafka地址
# group.id 消费者组ID
# auto.offset.reset earliest/latest
# acks 0/1/all(消息可靠性级别)
# retries 重试次数
# batch.size 批量大小
同类工具对比
| 特性 | Kafka | RabbitMQ | Redis Streams | Pulsar |
|---|
| 吞吐量 | 极高(百万/秒) | 中等 | 高 | 极高 |
| 消息持久化 | 磁盘持久化 | 内存+磁盘 | 内存+持久化 | 磁盘持久化 |
| 消息回放 | 支持(核心优势) | 不支持 | 有限支持 | 支持 |
| 延迟 | 毫秒级 | 微秒级 | 微秒级 | 毫秒级 |
| 适合场景 | 大数据管道、事件流 | 任务队列、RPC | 轻量级消息 | 多租户流处理 |
面试建议:Kafka面试高频。重点理解:1)为什么Kafka快(顺序写磁盘+零拷贝+批量发送);2)Consumer Group如何实现负载均衡;3)如何保证消息不丢失(acks=all + 手动提交offset);4)Kafka 4.0为什么去掉ZooKeeper。