编程 Kafka 4.0 彻底告别 ZooKeeper:KRaft 模式从原理到生产实战全解析

2026-06-20 12:00:08 +0800 CST views 18

Kafka 4.0 彻底告别 ZooKeeper:KRaft 模式从原理到生产实战全解析

前言

2025 年,Apache Kafka 4.0 正式发布,这不仅仅是一个大版本更新——它标志着一个时代的终结。从 Kafka 诞生之初就如影随形的 ZooKeeper 依赖,终于被彻底移除。KRaft(Kafka Raft)成为唯一的集群协调模式。

如果你还在用 Kafka 3.x 带着 ZooKeeper 跑集群,是时候认真面对迁移这件事了。本文不会给你泛泛而谈的概述,而是从 Raft 共识协议的底层原理出发,结合 Docker 部署实战、Java/Go 客户端代码、Exactly-Once 语义实现、性能调优和生产级最佳实践,给你一份真正能落地的东西。


一、为什么必须告别 ZooKeeper?

1.1 ZooKeeper 的原罪

ZooKeeper 在 Kafka 中的角色是元数据管理——Broker 注册、Topic 分区分配、Controller 选举等核心协调工作都依赖它。但这个"外挂"带来了太多问题:

运维复杂度翻倍。 你不仅要管理 Kafka 集群,还要管理一套 ZooKeeper 集群。两套系统的版本兼容、配置调优、监控告警、故障排查,工作量直接翻倍。更别提 ZooKeeper 自己的 Observer、Snapshot 事务日志清理这些坑。

元数据瓶颈。 Kafka 集群规模上去之后,Topic 和 Partition 数量动辄上万。ZooKeeper 的 Watch 机制在这种规模下表现糟糕——每次元数据变更都会触发大量 Watch 通知,Controller 成为热点,整个集群的元数据更新延迟飙升。这个问题在 10 万+ Partition 的集群上尤为明显。

Controller 故障恢复慢。 旧架构下,Controller 故障需要通过 ZooKeeper 选举新 Controller,新 Controller 要从 ZooKeeper 加载全部元数据,然后才能开始工作。集群越大,恢复越慢,几十秒甚至几分钟的不可用窗口在生产环境中是致命的。

架构层面的割裂。 元数据存储在 ZooKeeper,实际数据在 Broker,两者之间的状态同步是一个天然的脆弱点。你一定遇到过 ZooKeeper 和 Kafka 元数据不一致的噩梦——删了 Topic 但 ZooKeeper 里还留着,或者反过来。

1.2 KRaft 不是"换了个 ZooKeeper"

一个常见的误解是:KRaft 就是把 ZooKeeper 的功能内嵌到 Kafka 里了。这个理解太浅。

KRaft 的本质是重新定义了 Kafka 的元数据架构。在旧架构中,元数据是 ZooKeeper 的"外部状态",Kafka 只是使用者;在 KRaft 架构中,元数据是一条条日志记录,存储在 Kafka 自身的内部 Topic(@metadata)中,通过 Raft 协议在 Controller Quorum 之间复制。

这意味着:

  • 元数据变更变成了追加日志的操作,天然有序、可回放
  • Broker 通过拉取元数据日志来同步状态,替代了推模式的 Watch
  • Controller 不再需要"加载"元数据——它自己就是 Raft 日志的维护者

这不是简单的替换,这是架构范式的转变。


二、KRaft 核心原理:Raft 共识协议

2.1 Raft 协议三板斧

Raft 共识协议是 KRaft 的基石,由 Diego Ongaro 和 John Ousterhout 在 2014 年提出,以"易于理解"著称(相对于 Paxos)。它的核心机制有三个:

Leader Election(领导者选举)。 在任何时刻,Raft 集群中有且仅有一个 Leader 节点。所有写操作必须通过 Leader,Follower 只接受 Leader 的日志复制。当 Leader 故障时,Follower 通过超时机制触发选举,获得多数票的节点成为新 Leader。

选举过程的关键参数:

  • election.timeout.ms(默认 1000ms):Follower 在此时间内未收到 Leader 的心跳,发起选举
  • request.timeout.ms(默认 2000ms):选举请求的超时时间
  • 一个节点要赢得选举,必须获得 Quorum(多数)节点的投票

Log Replication(日志复制)。 Leader 收到写请求后,将其追加到自己的日志中,然后并行地发送给所有 Follower。当多数 Follower 确认写入后,这条日志就被认为是"已提交"(Committed),Leader 通知客户端写入成功。

这个过程保证了:

  • 已提交的日志不会丢失(多数节点已持久化)
  • 所有节点的日志顺序一致(Leader 只有一个,日志只能从 Leader 追加)

Safety(安全性保证)。 Raft 保证了几个关键的不变量:

  • Election Safety:每个 Term 最多一个 Leader
  • Leader Append-Only:Leader 只追加日志,不删除不覆盖
  • Log Matching:如果两条日志在同一 Index 上且 Term 相同,那么它们之前的所有日志也相同
  • Leader Completeness:如果一个日志被提交,那么后续所有 Leader 都会包含这条日志
  • State Machine Safety:任何节点在某个 Index 上应用的日志条目都相同

2.2 元数据日志机制

KRaft 中,所有的集群元数据变更——创建 Topic、修改配置、Broker 上下线、Partition Leader 切换——都被编码为一条条元数据日志记录(Metadata Record),追加到内部的 @metadata 日志中。

@metadata 日志结构(逻辑视图):
┌─────────┬──────────┬───────────────────────────────┐
│ Offset  │  Term    │  Record Type                  │
├─────────┼──────────┼───────────────────────────────┤
│   0     │    1     │  RegisterBrokerRecord(broker0) │
│   1     │    1     │  RegisterBrokerRecord(broker1) │
│   2     │    1     │  CreateTopicRecord(topicA)     │
│   3     │    1     │  PartitionRecord(topicA-0)     │
│   4     │    2     │  LeaderChangeRecord(broker1)   │
│  ...    │   ...    │  ...                           │
└─────────┴──────────┴───────────────────────────────┘

每个 Broker 维护一个本地的元数据日志拉取偏移量。Broker 定期从 Controller 拉取自己缺失的元数据记录,然后重放到本地的元数据缓存中。这种拉模型(Pull Model)相比 ZooKeeper 的推模型(Push/Watch)有几个优势:

  1. 背压控制:Broker 可以按自己的节奏消费元数据,不会被海量 Watch 通知淹没
  2. 断点续传:Broker 重启后从上次的位置继续拉取,不需要全量加载
  3. 顺序保证:日志天然有序,不存在乱序更新的问题

2.3 Controller Quorum 与投票者机制

KRaft 集群中的 Controller 节点组成一个 Raft Quorum。通常部署 3 个或 5 个 Controller 节点,其中一个是活跃的 Leader Controller,其余是 Follower Controller。

投票者(Voter)机制是 KRaft 的一个重要设计。并非所有 Controller 都必须是投票者——你可以配置 Observer Controller,它们参与日志复制但不参与投票:

# server.properties - Controller 节点配置
node.id=1
process.roles=controller
controller.quorum.voters=1@host1:9093,2@host2:9093,3@host3:9093
controller.listener.names=CONTROLLER

# Observer Controller(不参与投票,只复制日志)
# 在 KRaft 中,可以通过不加入 voters 列表实现

Quorum 大小的选择遵循 Raft 的通用原则:N 个投票者可以容忍 (N-1)/2 个故障

投票者数量容忍故障数推荐场景
31中小规模集群
52大规模关键集群

2.4 合并模式 vs 分离模式

KRaft 提供两种部署模式:

Combined 模式(合并模式)。 Broker 和 Controller 运行在同一个进程中。适合小规模集群或开发测试环境。

# server.properties - Combined 模式
process.roles=broker,controller
node.id=1
listeners=BROKER://:9092,CONTROLLER://:9093
controller.quorum.voters=1@localhost:9093,2@localhost:9095,3@localhost:9097
advertised.listeners=BROKER://localhost:9092

Separated 模式(分离模式)。 Broker 和 Controller 运行在不同的进程中,甚至不同的机器上。适合生产环境,职责隔离更清晰。

# Controller 节点 - controller.properties
process.roles=controller
node.id=1
listeners=CONTROLLER://:9093
controller.quorum.voters=1@ctrl1:9093,2@ctrl2:9093,3@ctrl3:9093

# Broker 节点 - broker.properties
process.roles=broker
node.id=4
listeners=BROKER://:9092
controller.quorum.voters=1@ctrl1:9093,2@ctrl2:9093,3@ctrl3:9093
advertised.listeners=BROKER://broker1:9092

生产环境强烈推荐分离模式。 原因很简单:Controller 负责元数据管理,对延迟敏感;Broker 负责数据读写,可能因为大流量导致 GC 或 CPU 飙升。合并在一起,Broker 的压力会直接影响 Controller 的稳定性,导致元数据操作延迟甚至超时。

2.5 Controller 与 Broker 职责分离

KRaft 架构下,Controller 和 Broker 的职责边界更加清晰:

Controller 的职责:

  • 维护 Raft 日志和元数据状态机
  • 处理管理操作(创建/删除 Topic、修改配置)
  • 触发 Partition Leader 选举
  • 监控 Broker 健康状态
  • 向 Broker 推送元数据变更通知

Broker 的职责:

  • 处理客户端的数据读写请求
  • 存储消息数据
  • 参与消费者组的协调
  • 向 Controller 报告自身状态

关键变化:Broker 不再直接与 ZooKeeper 交互,所有元数据操作都通过 Controller 完成。这简化了 Broker 的代码路径,也消除了元数据不一致的根源。


三、Kafka 4.0 新特性详解

3.1 ZooKeeper 完全移除

这是最大的变更。Kafka 4.0 中:

  • 所有 ZooKeeper 相关的配置参数和命令行工具被移除
  • --zookeeper 参数不再被任何脚本接受
  • ZooKeeper 模式的 Broker 无法启动
  • 迁移模式(从 ZooKeeper 迁移到 KRaft 的过渡模式)不再支持

如果你还在 Kafka 3.x 上使用 ZooKeeper 模式,升级到 4.0 之前必须先迁移到 KRaft 模式。没有捷径,没有回退。

3.2 新一代消费者重平衡协议

Kafka 4.0 将 Incremental Cooperative Rebalancing(增量协作重平衡)作为默认的消费者重平衡协议,替代了之前的 Eager(急切)协议。

旧协议的问题: Eager 重平衡时,所有消费者先 revoke 所有分区,然后重新分配。这导致整个消费者组在重平衡期间完全停止消费,即使最终分配结果没有变化。

新协议的改进:

  1. 只 revoke 需要移动的分区,未受影响的分区继续消费
  2. 分多轮完成重平衡,逐步调整分配
  3. 大幅减少"Stop-The-World"的时间窗口
// Kafka 4.0 中默认启用,无需额外配置
// 如果你需要显式指定:
properties.put("consumer.group.protocol", "consumer"); // 新协议
// 旧协议已移除,不可回退

3.3 点对点消息模型

Kafka 4.0 引入了更灵活的消费模式支持,允许更精细地控制分区与消费者的映射关系。这在某些场景下提供了类似点对点消息模型的能力,减少了不必要的分区复制开销。

3.4 移除旧协议 API

Kafka 4.0 移除了一批过时的 API 和功能:

  • 旧版消费者 API(kafka.consumer.SimpleConsumer
  • 旧版生产者 API(kafka.producer.Producer
  • 基于 ZooKeeper 的管理工具
  • 部分废弃的配置参数

这是 Kafka 社区的一贯风格——大版本不向后兼容,逼着你用新 API。虽然痛苦,但换来的是更干净的代码库和更简单的维护。


四、架构对比分析

4.1 旧架构 vs KRaft 架构

旧架构(ZooKeeper 模式):
┌──────────┐     ┌──────────┐     ┌──────────┐
│ Broker 0 │     │ Broker 1 │     │ Broker 2 │
└────┬─────┘     └────┬─────┘     └────┬─────┘
     │                │                │
     └────────────────┼────────────────┘
                      │ Watch/通知
              ┌───────▼───────┐
              │   ZooKeeper   │
              │   Ensemble    │
              └───────────────┘

KRaft 架构:
┌──────────┐     ┌──────────┐     ┌──────────┐
│ Broker 0 │     │ Broker 1 │     │ Broker 2 │
└────┬─────┘     └────┬─────┘     └────┬─────┘
     │                │                │
     │     拉取元数据日志              │
     └────────────────┼────────────────┘
                      │
        ┌─────────────▼──────────────┐
        │   Controller Quorum        │
        │  ┌───────┐  ┌───────┐     │
        │  │Ctrl 0 │  │Ctrl 1 │     │
        │  │Leader │  │Follower│    │
        │  └───────┘  └───────┘     │
        │         ┌───────┐         │
        │         │Ctrl 2 │         │
        │         │Follower│        │
        │         └───────┘         │
        └───────────────────────────┘

4.2 关键指标对比

维度ZooKeeper 模式KRaft 模式
组件数量Kafka + ZooKeeper仅 Kafka
Controller 故障恢复10-60 秒1-5 秒
Partition 规模上限~10 万~200 万(理论值)
元数据更新机制Watch 推送日志拉取
运维复杂度高(双系统)低(单系统)
元数据一致性可能不一致Raft 保证一致
集群启动时间分钟级秒级

五、Docker 部署实战

5.1 单机部署(Combined 模式)

适合本地开发和测试。

# docker-compose-single.yaml
version: '3.8'

services:
  kafka:
    image: apache/kafka:4.0.0
    container_name: kafka-single
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: BROKER://:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: BROKER://localhost:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-single:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      # 日志目录
      KAFKA_LOG_DIRS: /var/lib/kafka/data
    volumes:
      - kafka-data:/var/lib/kafka/data

volumes:
  kafka-data:

启动并验证:

# 启动
docker compose -f docker-compose-single.yaml up -d

# 等待启动完成
sleep 10

# 创建 Topic
docker exec kafka-single /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --create --topic test-topic \
  --partitions 3 --replication-factor 1

# 查看集群元数据
docker exec kafka-single /opt/kafka/bin/kafka-metadata.sh \
  --snapshot /var/lib/kafka/data/meta.properties

# 生产消息
docker exec kafka-single /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic test-topic

# 消费消息
docker exec kafka-single /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic test-topic --from-beginning

5.2 三节点集群部署(Separated 模式)

生产级部署推荐方案:

# docker-compose-cluster.yaml
version: '3.8'

networks:
  kafka-net:
    driver: bridge

services:
  kafka-controller-1:
    image: apache/kafka:4.0.0
    container_name: kafka-ctrl-1
    networks: [kafka-net]
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: controller
      KAFKA_LISTENERS: CONTROLLER://:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-ctrl-1:9093,2@kafka-ctrl-2:9093,3@kafka-ctrl-3:9093
      KAFKA_LOG_DIRS: /var/lib/kafka/data
    volumes:
      - ctrl1-data:/var/lib/kafka/data

  kafka-controller-2:
    image: apache/kafka:4.0.0
    container_name: kafka-ctrl-2
    networks: [kafka-net]
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: controller
      KAFKA_LISTENERS: CONTROLLER://:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-ctrl-1:9093,2@kafka-ctrl-2:9093,3@kafka-ctrl-3:9093
      KAFKA_LOG_DIRS: /var/lib/kafka/data
    volumes:
      - ctrl2-data:/var/lib/kafka/data

  kafka-controller-3:
    image: apache/kafka:4.0.0
    container_name: kafka-ctrl-3
    networks: [kafka-net]
    environment:
      KAFKA_NODE_ID: 3
      KAFKA_PROCESS_ROLES: controller
      KAFKA_LISTENERS: CONTROLLER://:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-ctrl-1:9093,2@kafka-ctrl-2:9093,3@kafka-ctrl-3:9093
      KAFKA_LOG_DIRS: /var/lib/kafka/data
    volumes:
      - ctrl3-data:/var/lib/kafka/data

  kafka-broker-1:
    image: apache/kafka:4.0.0
    container_name: kafka-broker-1
    networks: [kafka-net]
    ports:
      - "9092:9092"
    depends_on:
      - kafka-controller-1
      - kafka-controller-2
      - kafka-controller-3
    environment:
      KAFKA_NODE_ID: 4
      KAFKA_PROCESS_ROLES: broker
      KAFKA_LISTENERS: BROKER://:9092
      KAFKA_ADVERTISED_LISTENERS: BROKER://kafka-broker-1:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-ctrl-1:9093,2@kafka-ctrl-2:9093,3@kafka-ctrl-3:9093
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_NUM_NETWORK_THREADS: 4
      KAFKA_NUM_IO_THREADS: 8
      KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400
      KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_SEGMENT_BYTES: 1073741824
      KAFKA_NUM_PARTITIONS: 3
    volumes:
      - broker1-data:/var/lib/kafka/data

  kafka-broker-2:
    image: apache/kafka:4.0.0
    container_name: kafka-broker-2
    networks: [kafka-net]
    ports:
      - "9094:9092"
    depends_on:
      - kafka-controller-1
      - kafka-controller-2
      - kafka-controller-3
    environment:
      KAFKA_NODE_ID: 5
      KAFKA_PROCESS_ROLES: broker
      KAFKA_LISTENERS: BROKER://:9092
      KAFKA_ADVERTISED_LISTENERS: BROKER://kafka-broker-2:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-ctrl-1:9093,2@kafka-ctrl-2:9093,3@kafka-ctrl-3:9093
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
    volumes:
      - broker2-data:/var/lib/kafka/data

  kafka-broker-3:
    image: apache/kafka:4.0.0
    container_name: kafka-broker-3
    networks: [kafka-net]
    ports:
      - "9095:9092"
    depends_on:
      - kafka-controller-1
      - kafka-controller-2
      - kafka-controller-3
    environment:
      KAFKA_NODE_ID: 6
      KAFKA_PROCESS_ROLES: broker
      KAFKA_LISTENERS: BROKER://:9092
      KAFKA_ADVERTISED_LISTENERS: BROKER://kafka-broker-3:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-ctrl-1:9093,2@kafka-ctrl-2:9093,3@kafka-ctrl-3:9093
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
    volumes:
      - broker3-data:/var/lib/kafka/data

volumes:
  ctrl1-data:
  ctrl2-data:
  ctrl3-data:
  broker1-data:
  broker2-data:
  broker3-data:

5.3 关键配置参数详解

# ========== 核心身份配置 ==========
# 节点 ID,集群内唯一
node.id=1
# 进程角色:broker, controller, 或 broker,controller
process.roles=broker,controller

# ========== 监听器配置 ==========
# 监听器列表
listeners=BROKER://:9092,CONTROLLER://:9093
# 对外暴露的地址
advertised.listeners=BROKER://host1:9092
# 监听器名称映射
controller.listener.names=CONTROLLER
# 安全协议映射
listener.security.protocol.map=CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT

# ========== Controller Quorum 配置 ==========
# Quorum 投票者列表,格式:id1@host1:port1,id2@host2:port2,...
controller.quorum.voters=1@ctrl1:9093,2@ctrl2:9093,3@ctrl3:9093
# Leader 选举超时
controller.quorum.election.timeout.ms=1000
# 选举请求超时
controller.quorum.request.timeout.ms=2000
# Follower 拉取超时
controller.quorum.fetch.timeout.ms=2000

# ========== 存储配置 ==========
log.dirs=/var/lib/kafka/data
log.retention.hours=168
log.segment.bytes=1073741824
log.segment.delete.delay.ms=60000

# ========== 复制配置 ==========
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
default.replication.factor=3
min.insync.replicas=2

5.4 故障切换测试

部署完成后,验证集群的高可用性:

# 1. 创建测试 Topic
docker exec kafka-broker-1 /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka-broker-1:9092 \
  --create --topic ha-test \
  --partitions 6 --replication-factor 3

# 2. 查看分区 Leader 分布
docker exec kafka-broker-1 /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka-broker-1:9092 \
  --describe --topic ha-test

# 3. 模拟 Broker 故障
docker stop kafka-broker-1

# 4. 再次查看分区 Leader(应该自动切换到其他 Broker)
docker exec kafka-broker-2 /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka-broker-2:9092 \
  --describe --topic ha-test

# 5. 模拟 Controller 故障
docker stop kafka-ctrl-1

# 6. 验证集群仍可正常工作
docker exec kafka-broker-2 /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka-broker-2:9092 \
  --list

# 7. 恢复节点
docker start kafka-ctrl-1
docker start kafka-broker-1

六、客户端代码示例

6.1 Java(Spring Boot + Kafka)

Maven 依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <version>3.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>3.3.0</version>
    </dependency>
</dependencies>

生产者配置:

package com.example.kafka.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        
        // 性能优化参数
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);        // 32KB 批次
        config.put(ProducerConfig.LINGER_MS_CONFIG, 10);             // 等待 10ms 凑批
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");   // LZ4 压缩
        config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864L);  // 64MB 缓冲区
        config.put(ProducerConfig.ACKS_CONFIG, "all");               // 等待所有 ISR 确认
        config.put(ProducerConfig.RETRIES_CONFIG, 3);               // 重试 3 次
        config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等生产者
        
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

消费者配置:

package com.example.kafka.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-demo-group");
        
        // 手动提交 Offset
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        // 性能优化
        config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);       // 最少拉取 1KB
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);       // 单次最多 500 条
        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);   // 会话超时 30s
        config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 处理超时 5min
        
        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setConcurrency(3); // 3 个消费线程
        return factory;
    }
}

生产者服务:

package com.example.kafka.service;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

@Slf4j
@Service
@RequiredArgsConstructor
public class MessageProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 同步发送消息
     */
    public SendResult<String, String> sendSync(String topic, String key, String message) {
        try {
            CompletableFuture<SendResult<String, String>> future = 
                kafkaTemplate.send(topic, key, message);
            return future.get(); // 阻塞等待确认
        } catch (Exception e) {
            log.error("同步发送消息失败: topic={}, key={}", topic, key, e);
            throw new RuntimeException("消息发送失败", e);
        }
    }

    /**
     * 异步发送消息(带回调)
     */
    public void sendAsync(String topic, String key, String message) {
        CompletableFuture<SendResult<String, String>> future = 
            kafkaTemplate.send(topic, key, message);
        
        future.whenComplete((result, ex) -> {
            if (ex != null) {
                log.error("异步发送消息失败: topic={}, key={}, partition={}", 
                    topic, key, 
                    result != null ? result.getRecordMetadata().partition() : "N/A", 
                    ex);
            } else {
                log.debug("消息发送成功: topic={}, partition={}, offset={}", 
                    topic, 
                    result.getRecordMetadata().partition(), 
                    result.getRecordMetadata().offset());
            }
        });
    }
}

消费者服务(手动提交 Offset):

package com.example.kafka.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class MessageConsumer {

    @KafkaListener(topics = "test-topic", groupId = "kafka-demo-group")
    public void consume(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        try {
            log.info("收到消息: topic={}, partition={}, offset={}, key={}, value={}", 
                record.topic(), 
                record.partition(), 
                record.offset(), 
                record.key(), 
                record.value());
            
            // 处理业务逻辑
            processMessage(record.value());
            
            // 手动提交 Offset(处理成功后才提交)
            acknowledgment.acknowledge();
            log.debug("Offset 已提交: partition={}, offset={}", 
                record.partition(), record.offset());
                
        } catch (Exception e) {
            log.error("消息处理失败: partition={}, offset={}", 
                record.partition(), record.offset(), e);
            // 不提交 Offset,下次重新消费
            // 可以加入死信队列逻辑
        }
    }

    private void processMessage(String message) {
        // 你的业务逻辑
    }
}

6.2 Go 客户端

使用 github.com/IBM/sarama 库(Kafka 4.0 兼容):

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"strings"
	"syscall"
	"time"

	"github.com/IBM/sarama"
)

// ==================== 生产者 ====================

func main() {
	producer, err := createProducer()
	if err != nil {
		log.Fatalf("创建生产者失败: %v", err)
	}
	defer producer.Close()

	// 发送消息
	for i := 0; i < 100; i++ {
		key := fmt.Sprintf("key-%d", i)
		value := fmt.Sprintf("message-%d at %s", i, time.Now().Format(time.RFC3339))
		
		partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
			Topic: "test-topic",
			Key:   sarama.StringEncoder(key),
			Value: sarama.StringEncoder(value),
		})
		if err != nil {
			log.Printf("发送消息失败: %v", err)
			continue
		}
		log.Printf("消息已发送: partition=%d, offset=%d, key=%s", partition, offset, key)
	}
}

func createProducer() (sarama.SyncProducer, error) {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Producer.RequiredAcks = sarama.WaitForAll  // 等待所有 ISR 确认
	config.Producer.Retry.Max = 3
	config.Producer.Partitioner = sarama.NewHashPartitioner
	
	// 性能优化
	config.Producer.Flush.Messages = 100       // 凑满 100 条发送
	config.Producer.Flush.Frequency = 10 * time.Millisecond // 或等待 10ms
	config.Producer.Compression = sarama.CompressionLZ4
	config.Net.MaxOpenRequests = 5

	brokers := []string{"localhost:9092"}
	return sarama.NewSyncProducer(brokers, config)
}

// ==================== 消费者 ====================

func startConsumer() {
	config := sarama.NewConfig()
	config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
		sarama.NewBalanceStrategyRange(), // 或 NewBalanceStrategyRoundRobin()
	}
	config.Consumer.Offsets.Initial = sarama.OffsetOldest
	config.Consumer.Offsets.Retry.Max = 3
	config.Consumer.Fetch.Min = 1024       // 最少 1KB
	config.Consumer.Fetch.Default = 1048576 // 默认 1MB
	config.Consumer.MaxProcessingTime = 100 * time.Millisecond

	brokers := []string{"localhost:9092"}
	group, err := sarama.NewConsumerGroup(brokers, "kafka-demo-group", config)
	if err != nil {
		log.Fatalf("创建消费者组失败: %v", err)
	}
	defer group.Close()

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	handler := &consumerGroupHandler{}

	// 优雅退出
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

	go func() {
		for {
			if err := group.Consume(ctx, []string{"test-topic"}, handler); err != nil {
				log.Printf("消费者组错误: %v", err)
			}
			if ctx.Err() != nil {
				return
			}
		}
	}()

	<-sigchan
	cancel()
	log.Println("消费者已关闭")
}

type consumerGroupHandler struct{}

func (h *consumerGroupHandler) Setup(sarama.ConsumerGroupSession) error   { return nil }
func (h *consumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }

func (h *consumerGroupHandler) ConsumeClaim(
	session sarama.ConsumerGroupSession,
	claim sarama.ConsumerGroupClaim,
) error {
	for message := range claim.Messages() {
		log.Printf("收到消息: topic=%s, partition=%d, offset=%d, key=%s, value=%s",
			message.Topic, message.Partition, message.Offset,
			string(message.Key), string(message.Value))

		// 处理业务逻辑
		if err := processMessage(string(message.Value)); err != nil {
			log.Printf("消息处理失败: offset=%d, err=%v", message.Offset, err)
			// 不提交 Offset,等待重新消费
			continue
		}

		// 手动提交 Offset
		session.MarkMessage(message, "")
		session.Commit()
	}
	return nil
}

func processMessage(value string) error {
	// 你的业务逻辑
	return nil
}

七、Exactly-Once 语义实现

Exactly-Once 是 Kafka 最强大的特性之一,也是最容易用错的。Kafka 4.0 中,Exactly-Once 语义的实现依赖三个机制的配合:

7.1 幂等生产者(Idempotent Producer)

幂等生产者保证单分区内消息不重复。开启后,Kafka 为每个生产者分配一个 Producer ID(PID)和递增的 Sequence Number,Broker 据此去重:

// 开启幂等生产者(Kafka 3.0+ 默认开启)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 等价于:
// acks=all
// retries=Integer.MAX_VALUE
// max.in.flight.requests.per.connection <= 5

注意: 幂等生产者只保证单分区内的去重,跨分区不保证。而且只在单个 Producer 实例的生命周期内有效——Producer 重启后获得新的 PID,无法去重之前的消息。

7.2 事务(Transactions)

事务保证跨分区、跨 Topic 的原子写入。配合消费者的事务读取,可以实现端到端的 Exactly-Once:

// 事务生产者配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-producer-1"); // 必须!
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.ACKS_CONFIG, "all");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 初始化事务(只需一次)
producer.initTransactions();

try {
    // 开始事务
    producer.beginTransaction();
    
    // 发送多条消息到不同 Topic
    producer.send(new ProducerRecord<>("topic-a", "key1", "value1"));
    producer.send(new ProducerRecord<>("topic-b", "key2", "value2"));
    
    // 提交消费者的 Offset(消费-处理-生产 模式)
    producer.sendOffsetsToTransaction(
        offsets,  // Map<TopicPartition, OffsetAndMetadata>
        consumerGroupId
    );
    
    // 提交事务
    producer.commitTransaction();
    
} catch (ProducerFencedException e) {
    // 另一个具有相同 transactional.id 的生产者已启动
    producer.close();
} catch (KafkaException e) {
    // 中止事务
    producer.abortTransaction();
}

7.3 消费者端配置

// 事务消费者配置
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "tx-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // 只读已提交的消息
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

isolation.level=read_committed 是关键——它确保消费者只读到已提交事务的消息,不会读到中止事务的"脏数据"。

7.4 Exactly-Once 的陷阱

  1. transactional.id 必须唯一且稳定。 相同的 transactional.id 在新 Producer 启动时会 fence 掉旧的 Producer。如果你使用动态生成,重启后无法恢复事务状态。
  2. 事务超时默认 60 秒。 如果你的处理逻辑超过这个时间,事务会自动中止。可以通过 transaction.timeout.ms 调整。
  3. read_committed 有性能开销。 Broker 需要维护事务状态,消费者需要过滤未提交的消息。高吞吐场景下,延迟会增加。
  4. 跨系统不支持。 Kafka 事务不能和数据库事务协调——如果你需要"Kafka 写入 + 数据库写入"的原子性,只能用两阶段提交或幂等补偿。

八、性能优化与调优

8.1 生产者优化

# 批次大小(字节),达到此大小才发送
batch.size=32768
# 等待时间(毫秒),即使批次未满也发送
linger.ms=10
# 压缩类型:none, gzip, snappy, lz4, zstd
# lz4 压缩比和速度平衡最好,zstd 压缩比更高但更耗 CPU
compression.type=lz4
# 生产者缓冲区大小(字节)
buffer.memory=67108864
# 确认机制:0(不等确认), 1(Leader 确认), all(所有 ISR 确认)
acks=all
# 重试次数
retries=3
# 单个请求最大字节数
max.request.size=1048576
# 连接上最大未确认请求数(开启幂等时 <= 5)
max.in.flight.requests.per.connection=5

调优建议:

  • batch.sizelinger.ms 是吞吐量 vs 延迟的权衡。高吞吐场景增大 batch.size,低延迟场景减小 linger.ms
  • 压缩类型选择:CPU 充裕选 zstd(压缩比最好),CPU 敏感选 lz4(速度最快)
  • acks=all 保证可靠性,但增加延迟。如果可以容忍少量丢消息,acks=1 可以显著提升吞吐

8.2 消费者优化

# 单次拉取最小字节数,不够就等
fetch.min.bytes=1024
# 单次拉取最大字节数
fetch.max.bytes=5242880
# 单次 poll 返回的最大记录数
max.poll.records=500
# 等待拉取响应的最长时间
fetch.max.wait.ms=500
# 会话超时(心跳超时后踢出消费者组)
session.timeout.ms=30000
# 两次 poll 的最大间隔(处理超时)
max.poll.interval.ms=300000
# 心跳间隔(通常为 session.timeout.ms 的 1/3)
heartbeat.interval.ms=10000
# 自动提交间隔
auto.commit.interval.ms=5000

调优建议:

  • fetch.min.bytes 增大可以减少拉取次数,提升吞吐,但增加延迟
  • max.poll.records 要和你的处理能力匹配——设置太大但处理不过来,会触发 rebalance
  • max.poll.interval.ms 根据你的消息处理耗时设置——设置太小会导致频繁 rebalance

8.3 Broker 优化

# 日志段大小,影响索引粒度和文件切换频率
log.segment.bytes=1073741824
# I/O 线程数(处理磁盘读写),建议 = 磁盘数
num.io.threads=8
# 网络线程数(处理网络请求),建议 = CPU 核数
num.network.threads=4
# 日志刷盘策略(依赖操作系统 page cache 通常更好)
# log.flush.interval.messages=10000
# log.flush.interval.ms=1000
# 后台线程数(日志清理、压缩等)
background.threads=4
# Socket 发送/接收缓冲区
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

调优建议:

  • 不要手动配置 log.flush.interval,让操作系统的 Page Cache 来做。手动刷盘会严重降低吞吐
  • num.io.threads 应该等于或略大于数据目录数(log.dirs 中的目录数)
  • num.network.threads 在高并发场景下适当增大,但过多会导致上下文切换

8.4 KRaft 特定优化

# Controller 选举超时,越短故障恢复越快,但越容易误触发选举
controller.quorum.election.timeout.ms=1000
# 选举请求超时
controller.quorum.request.timeout.ms=2000
# Follower 拉取元数据日志超时
controller.quorum.fetch.timeout.ms=2000
# 元数据日志最大批次大小
controller.quorum.fetch.max.bytes=1048576
# 元数据日志同步频率
metadata.log.max.record.memory.bytes=5242880
# 元数据偏移量快照间隔(影响重启恢复速度)
metadata.log.offset.snapshot.interval.ms=10000

调优建议:

  • election.timeout.ms 不要设太小(< 500ms),否则网络抖动会频繁触发选举
  • 元数据量大的集群(几千个 Topic),适当增大 fetch.max.bytes
  • 快照间隔影响重启恢复速度——间隔越大,重启后需要重放的日志越多

九、生产级最佳实践

9.1 集群规划建议

Controller 节点:

  • 3 个 Controller(可容忍 1 个故障)或 5 个(可容忍 2 个故障)
  • 专用机器,不要和 Broker 混部署
  • 配置独立的高性能磁盘(SSD,用于元数据日志)
  • CPU 和内存要求不高,但网络延迟要低

Broker 节点:

  • 根据数据量和吞吐计算节点数
  • 磁盘选择:JBOD(Just a Bunch Of Disks)优于 RAID,Kafka 自己管理分区分布
  • 内存:足够的 Page Cache,通常 JVM 堆 6-8GB,剩余给 Page Cache
  • 网络:万兆网卡在大规模集群中是必须的

容量估算公式:

每日数据量 = 每秒消息数 × 消息平均大小 × 86400
总存储需求 = 每日数据量 × 保留天数 × 副本数 × 1.1(预留 10%)
单 Broker 存储 = 总存储需求 / Broker 数
建议单 Broker 存储 < 磁盘容量的 70%

9.2 Topic 设计策略

分区数:

  • 分区数 = 期望的并行消费者数
  • 分区数 = 吞吐量需求 / 单分区吞吐量
  • 分区数不宜过多——每个分区都有内存和文件句柄开销,且影响 Controller 元数据管理
  • 单集群建议分区数 < 10 万(KRaft 模式下可以更高)

副本数:

  • 生产环境最少 3 副本
  • min.insync.replicas=2(配合 acks=all
  • 跨机房部署考虑 Rack Awareness

Key 设计:

  • 需要顺序保证的消息使用相同的 Key(路由到同一分区)
  • 避免热点 Key——考虑添加随机后缀打散
  • Key 为 null 时使用轮询分配,适合无序场景

9.3 安全配置

SASL/PLAIN 认证:

# Broker 配置
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

# JAAS 配置(通过 JVM 参数传入)
# -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
# kafka_server_jaas.conf
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_producer="producer-secret"
    user_consumer="consumer-secret";
};

SSL/TLS 加密:

# Broker SSL 配置
listeners=SSL://:9093
security.inter.broker.protocol=SSL
ssl.keystore.location=/etc/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password=keystore-pass
ssl.key.password=key-pass
ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password=truststore-pass
ssl.client.auth=required

ACL 授权:

authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
super.users=User:admin
allow.everyone.if.no.acl.found=false
# 创建 ACL
kafka-acls.sh --bootstrap-server localhost:9092 \
  --add --allow-principal User:producer \
  --operation Write --topic test-topic

kafka-acls.sh --bootstrap-server localhost:9092 \
  --add --allow-principal User:consumer \
  --operation Read --topic test-topic \
  --operation Read --group consumer-group

9.4 监控指标

必须监控的 Broker 指标:

指标JMX MBean告警阈值
在线 Broker 数kafka.controller:type=KafkaController,name=ActiveBrokerCount< 预期值
离线分区数kafka.controller:type=KafkaController,name=OfflinePartitionsCount> 0
活跃 Controllerkafka.controller:type=KafkaController,name=ActiveControllerCount!= 1
请求处理延迟kafka.network:type=RequestMetrics,name=RequestQueueSize持续增长
日志段大小kafka.log:type=Log,name=Size,topic=*,partition=*接近磁盘容量
ISR 缩减kafka.cluster:type=Partition,name=UnderReplicated,topic=*,partition=*> 0

必须监控的生产者指标:

指标说明
record-send-rate每秒发送记录数
record-error-rate每秒发送失败数
batch-size-avg平均批次大小
record-queue-time-avg记录在缓冲区等待时间
compression-rate压缩率

必须监控的消费者指标:

指标说明
records-consumed-rate每秒消费记录数
records-lag-max最大消费延迟(条数)
commit-rateOffset 提交速率
join-rate加入消费者组频率(频繁 = rebalance 问题)

Prometheus + Grafana 监控方案:

# kafka-exporter部署
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-exporter
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-exporter
  template:
    spec:
      containers:
      - name: kafka-exporter
        image: danielqsj/kafka-exporter:latest
        args:
          - --kafka.server=kafka-broker-1:9092
          - --kafka.server=kafka-broker-2:9092
          - --kafka.server=kafka-broker-3:9092
          - --web.listen-address=:9308
        ports:
        - containerPort: 9308

9.5 故障排查指南

问题 1:消费者频繁 Rebalance

症状:日志中出现 rebalance 相关信息,消费延迟飙升。

排查:

# 查看消费者组状态
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group your-group

# 检查消费者配置
# - max.poll.interval.ms 是否太小?
# - 消息处理是否太慢?
# - session.timeout.ms 是否合理?

解决:

  • 增大 max.poll.interval.ms(如果处理耗时较长)
  • 减小 max.poll.records(减少单次处理量)
  • 检查 GC 问题(消费者进程是否频繁 Full GC)

问题 2:生产者发送超时

症状:TimeoutExceptionNotEnoughReplicasException

排查:

# 检查 ISR 列表
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic your-topic
# 关注 ISR 是否少于预期

解决:

  • ISR 不足:检查落后 Broker 的磁盘、网络、CPU
  • min.insync.replicas 设置过高:评估是否可以降低
  • 网络延迟:检查 Broker 之间网络连通性

问题 3:KRaft Controller 选举异常

症状:集群无活跃 Controller,所有管理操作失败。

排查:

# 查看 Controller 日志
grep "KRaft" /var/log/kafka/server.log | tail -50

# 检查 Quorum 状态
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 \
  describe --status

# 检查投票者配置是否一致
grep "controller.quorum.voters" /etc/kafka/server.properties

解决:

  • 确保所有节点的 controller.quorum.voters 配置一致
  • 检查 Controller 节点之间的网络连通性
  • 检查磁盘空间(元数据日志无法写入会导致 Controller 退出)

问题 4:元数据不一致

KRaft 模式下元数据不一致比 ZooKeeper 模式少得多,但仍可能发生:

# 检查元数据快照
kafka-dump-log.sh --files /var/lib/kafka/data/@metadata-0/00000000000000000000.log \
  --cluster-metadata-decoder

# 强制元数据同步(慎用!)
# 删除 Broker 的元数据缓存,重启后会从 Controller 全量拉取

十、从 ZooKeeper 迁移到 KRaft

如果你还在 Kafka 3.x + ZooKeeper 上,迁移路径如下:

10.1 迁移前准备

  1. 升级到 Kafka 3.6+:Kafka 3.6 开始支持从 ZooKeeper 模式迁移到 KRaft 模式
  2. 确认集群健康:所有 Broker 在线,ISR 完整,无未完成的分区重分配
  3. 备份 ZooKeeper 数据:以防万一需要回滚
  4. 测试环境演练:先在测试环境完整走一遍迁移流程

10.2 迁移步骤(Kafka 3.6+)

# 1. 在 ZooKeeper 模式下启动迁移
# 在每个 Broker 的配置中添加:
# zookeeper.metadata.migration.enable=true

# 2. 逐步启动 KRaft Controller(与 ZooKeeper 并行运行)
# Controller 配置:
# controller.quorum.voters=1@ctrl1:9093,2@ctrl2:9093,3@ctrl3:9093
# zookeeper.connect=zookeeper:2181  # 仍然连接 ZooKeeper

# 3. 验证迁移状态
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status

# 4. 迁移完成后,移除 ZooKeeper 相关配置
# 删除 zookeeper.connect
# 删除 zookeeper.metadata.migration.enable
# 重启所有 Broker

# 5. 停止 ZooKeeper

10.3 注意事项

  • 迁移过程中集群可以正常提供服务
  • 迁移是不可逆的——一旦完成,不能回退到 ZooKeeper 模式
  • Kafka 4.0 不支持迁移模式——如果你还在 ZooKeeper 模式,必须先在 3.6+ 上完成迁移,再升级到 4.0

十一、总结与展望

Kafka 4.0 移除 ZooKeeper 不是终点,而是一个新起点。

当前状态: KRaft 模式已经稳定,社区几年的打磨让它足够可靠。如果你是新项目,没有理由再碰 ZooKeeper。如果你是存量项目,迁移窗口已经打开——Kafka 3.6+ 提供了平滑迁移路径,没有理由再拖延。

未来方向:

  • Partition 级别的元数据管理优化,支持百万级 Partition
  • 更细粒度的 Controller 职责划分,降低单点压力
  • 与 Kafka 生态更紧密的整合(Kafka Connect、Kafka Streams 都已经适配 KRaft)
  • 云原生部署的持续优化

最后说一句: 技术选型从来不是选"最好的",而是选"最合适的"。但对于 Kafka + ZooKeeper 这个组合来说,KRaft 在几乎所有维度上都是更好的选择——更简单的运维、更快的恢复、更强的扩展性、更一致的元数据。这不是一个需要纠结的决定。

如果你的团队还在犹豫要不要迁移,我的建议很简单:开个测试环境,照着本文的步骤走一遍。 实际跑一遍,比看十篇文章都管用。


参考资料

推荐文章

paint-board:趣味性艺术画板
2024-11-19 07:43:41 +0800 CST
Elasticsearch 的索引操作
2024-11-19 03:41:41 +0800 CST
介绍Vue3的静态提升是什么?
2024-11-18 10:25:10 +0800 CST
php指定版本安装php扩展
2024-11-19 04:10:55 +0800 CST
Web浏览器的定时器问题思考
2024-11-18 22:19:55 +0800 CST
程序员茄子在线接单