Preface

kafka controller 是 kafka 设计中重要的一环, 负责 kafka 集群状态 、topic 元数据的缓存和管理(topic、replica的管理), 下面分成两部分分析 kafka controller. 一部分是 kafka controller 的redesign 设计, 第二部分是从源代码分析和思考

Redesign

网上关于 redesign(https://docs.google.com/document/d/1rLDmzDOGQQeSiMANP0rC2RYp_L7nUGHzFD9MQISgXYM/edit#heading=h.pxfjarumuhko) 翻译的版本很多, 但是在看这个内容之前, 我们需要思考几个问题: 1. 为什么要redesign, 存在什么问题 2. 如何解决这些问题.

从文章内容上来看, 主要存在以下问题:

  1. 每个partition的zk写入是同步, 并发度不够(比如broker宕机导致的 partition leader重新选举、replica摘除)
  2. controller-broker 的请求也是每个partition 顺序的 (StopReplicaRequests LeaderAndIsrRequest UpdateMetadataRequest 这些并发量很大的请求, broker宕机的时候触发)
  3. 并发管理复杂 (controller-broker channel、zk、kafkaApi 都会操作)
  4. controller 代码组织混乱 (replicaStateMachine 和 topicStateMachine 的状态边界分的不是很清晰, 需要一些状态同步)
  5. 控制面和数据面没有分离, 控制面的命令不能及时下达, 会导致什么问题呢? (新选举的leader无法及时通知正在忙于处理用户数据的旧的leader, ack=1 和0 的会导致数据丢失)
  6. controller-broker 的请求 没有 broker generation, 会导致什么问题呢? (broker收到之前的controller过期的请求)
  7. zkClinet 没有状态管理 导致了什么问题? (线性处理逻辑导致最新的notification没有被及时通知到)

针对上面的问题, 又是怎么处理的呢? 基于文档和 当前版本2.6

  1. 使用异步的zk api
  2. controller-broker 请求批量 (实现了)
  3. 单线程的事件处理模型 (队列抽象: ControllerEvent+QueuedEvent+ControllerEventThread 的设计)
  4. 重构cluster的状态管理 (抽象了ControllerState和处理模型)
  5. controller 请求设置一个优先级 (没做, 因为两个队列会导致 优先级/普通队列 延迟高)
  6. controller-broker 请求有generation的概念 (epoch, 怎么实现?)
  7. 使用 vanilla zk 客户端, 更好的处理客户端状态 (回调, 结合event queue clear, 提供了client状态的回调)

Code Review

Event 设计

先从整体上看下, 在目前的版本 “2.6” 中, 最大的改动是 事件的抽象, 事件的抽象不是简单的event, 而是包含了整理处理流程的设计. 参考 ControllerEventManager.scala . 主要的对象如下:

trait ControllerEventProcessor {
  def process(event: ControllerEvent): Unit
  def preempt(event: ControllerEvent): Unit
}

class QueuedEvent(val event: ControllerEvent,
                  val enqueueTimeMs: Long) {
}

class ControllerEventManager(controllerId: Int,
                             processor: ControllerEventProcessor,
                             time: Time,
                             rateAndTimeMetrics: Map[ControllerState, KafkaTimer]) extends KafkaMetricsGroup {

  def put(event: ControllerEvent): QueuedEvent = inLock(putLock) {
    val queuedEvent = new QueuedEvent(event, time.milliseconds())
    queue.put(queuedEvent)
    queuedEvent
  }

  class ControllerEventThread(name: String) extends ShutdownableThread(name = name, isInterruptible = false) {
    logIdent = s"[ControllerEventThread controllerId=$controllerId] "

    override def doWork(): Unit = {
      val dequeued = queue.take()
      dequeued.event match {
        case ShutdownEventThread => // The shutting down of the thread has been initiated at this point. Ignore this event.
        case controllerEvent =>
          _state = controllerEvent.state

          eventQueueTimeHist.update(time.milliseconds() - dequeued.enqueueTimeMs)

          try {
            def process(): Unit = dequeued.process(processor)

            rateAndTimeMetrics.get(state) match {
              case Some(timer) => timer.time { process() }
              case None => process()
            }
          } catch {
            case e: Throwable => error(s"Uncaught error processing event $controllerEvent", e)
          }

          _state = ControllerState.Idle
      }
    }
  }
}

event 会投递先投递到queue中, 然后由 单线程 ControllerEventThread 不断从队列中获取然后处理, 处理流程 调用event#process, 其实最终调用的是 ControllerEventProcessor#process . 也就是说 event 一次进入队列, 但是最终处理都是统一在了 ControllerEventProcessor#process, 通过队列简化了并发问题. 在Kafka 中, ControllerEventProcessor 只有 KafkaController 这个唯一实现.

存储: zk&ControllerContext

kafka 基于 zk 做了元数据存储和动态监听, 除了 zk, kafka 在本地也缓存了一份数据在 ControllerContext. zk的paht和数据编解码在 ZkData 目录下.

在实施细节层面, zk 是基于异步 的方式, 对于常用的zk操作命令进行了重新的封装, 进行zk请求的时候通过注册 DataCallback/StatCallback/Children2Callback 等进行异步操作. 参见 zookeeper/ZookeeperClient.scala

但是Kafka 并不是直接操作 ZookeeperClient, 而是在 ZookeeperClient 上封装的 AdminZkClient 和 KafkaZkClient, 位于 zk 文件夹下

除了直接操作zk, kafka controller 还会通过动态监听 zk节点信息, KafkaController对zk的zknode节点注册相应的handler, znode事件被触发之后 就会通过handler投递到 eventManager 的queue, 进行异步处理(上面的事件抽象).

补充: ControllerContext 作为缓存, 不仅仅是zk 数据的全局快照, 而且还解耦了 PartitionStateMachine ReplicaStateMachine和 TopicDeletionManager, 避免相互依赖

controler-broker

controller 本质上是选主成功的 broker, controller 会进行topic、replica 等状态的维护, controller 会把 每次变更都 同步给其他broker. 变更同步的场景中, 在 0.11 版本之前是一个个broker进行同步阻塞的, 目前的版本中, 多个broker虽然也是轮流发送的, 但是 接收响应却是异步的, 通过这种方式提高了并发度, kafka 封装了场景的命令同步操作: LeaderAndIsrRequest、UpdateMetadataRequest、StopReplicaRequest, 对外提供了 ControllerBrokerRequestBatch 的设计. 消息回调的处理依旧使用了 event 机制避免并发混乱, 简化实现流程.

网络通信层, broker 和 controller 的通信走的是 单线程 + 队列 + NIO selector 的设计, 每个broker单独一个线程处理.

状态抽象

采用了 事件处理机制之后, 有时候我们希望知道 controller 的内部状态处理的耗时, 这样我们就能很快定位到问题确保 Kafka 健康. 为此, kafka 抽象了 ControllerState, 并在 ControllerEventThread 处理的时候进行打点. 也就是说 kafka state 的抽象目前仅仅是用来打日志

参看官方: https://cwiki.apache.org/confluence/display/KAFKA/KIP-143%3A+Controller+Health+Metrics

状态机

PartitionStateMachine 负责partition 状态变更逻辑, kafkaController 是上层调用者, 自身主体逻辑位于 doHandleStateChanges , 比较有意思的事情是, partition的leader选举分散到了两个地方, 本身的 electLeaderForPartitions 逻辑逻辑 以及 Election 算法实现, 将算法单独放在了 Election.scala 文件

ZkReplicaStateMachine 负责replica的维护: 库容、缩容. 主体逻辑在: doHandleStateChanges

TopicDeletionManager 处理topic删除逻辑, 为此删除单独开了一个zk节点, 避免对topic节点的干扰, 方面维护topic删除状态.

总结

针对文件, 进行一下总结:

  • ControllerChannelManager: 每个broker维护一个 NIO Selector + queue + 线程. 和0.11.0之前版本, 支持了批量, 同一个命令多个broker同时并行发送, 使用回调处理结果 (ControllerChannelManager.scala)
  • 批量的设计参考 ControllerBrokerRequestBatch: ((ControllerChannelManager.scala))
  • ControllerContext 抽象, 提供内部元数据的接口: 获取指定状态的partition、replica等. (ControllerContext.scala)
  • ControllerEventManager: Event 抽象, controller、broker、topic、partition、replica 等变更都是一个event, ControllerEventManager&ControllerEventThread 单线程处理
  • ControllerState: 和event一一对应, 为什么呢?
  • Election: 几种partition leader选举上线
  • KafkaController: 基于event处理机制, 处理相应的事件, 注册handler监听zk
  • replicaStateMachine: 处理replica内部的状态流转
  • partitionStateMachine: 处理partition内部的状态流转
  • TopicDeletionManager: 管理topic删除逻辑

关于选主

….

思考

  1. epoch

为了能够明显的区分 generation 的概念, kafka 使用了 zxid 作为 epoch.

  • brokerEpoch: 通过挂在临时节点 /brokers/ids/$id 获取的 czxid 作为epoch
  • controllerEpoch: 挂载zk节点/controller_epoch, 自己做的自增
  • partition 会存储 leader epoch 和 controllerEpoch, 是为什么呢? leader epoch 存储的每次选主 都是自增的, 注意不是broker epoch. 为什么partition会存储controllerEpoch 呢? 这样避免controller脑裂. 在 partition 第一次初始化, 在进入online状态的时候, 会存储一个 brokerEpoch=0, 并同步到broker. 后面选主之后在更新.
  1. zxid

zookeeper 的每次节点变更都会受到一个 ZxId 格式的时间戳, 本质上一个 64位数字, 高32位 epoch是 leader 变更标记, 每次leader 选举, 都会变更. 低32位是一个递增计数. zookeeper 为每个节点维护了两个 zxid:

  • czxid: 节点创建所对应的zxid
  • mzxid: 节点修改对应的zxid

参考: - https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Internals - https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Redesign - https://docs.google.com/document/d/1rLDmzDOGQQeSiMANP0rC2RYp_L7nUGHzFD9MQISgXYM/edit#heading=h.pxfjarumuhko