debug方式启动服务端后, 通过在 KafkaApis#handle 设置断点, 可以发现启动过程中, broker 接受了三个请求:

UpdateMetadata -> LeaderAndIsr -> UpdateMetadata

但是, 后来通过调试分析发现, 只有当存在topic的时候, broker才会接收到三个请求, 没有topic的情况下, 其实, broker只会接受到一个请求.

没有topic的处理

没有topic的情况下, Kafka broker 只会接受到一个请求: UpdateMetadata. 执行流程如下:

 KafkaController#startup -> Startup#process -> KafkaController#elect -> #onControllerFailover -> #sendUpdateMetadataRequest -> brokerRequestBatch#addUpdateMetadataRequestForBrokers

描述: Kafka Controller 启动后, 会竞选 leader, 成功后会立即触发发送一次 UpdateMetadata.

broker接受到请求后, 处理流程:

  • 如果有删除的partition, 删除partition
  • 完成topic的延时任务

有topic的处理

有topic的情况下, Kafka broker 会接受到三个请求: UpdateMetadata、LeaderAndIsr、UpdateMetadata. 通过debug, 执行流程如下:

  1. UpdateMetadata:

    同上. KafkaController 竞选leader成功后发送请求给Broker

  2. LeaderAndIsr:

    启动的时候, ReplicaStateMachine 发送 LeaderAndIsr 的请求给Broker.

  3. UpdateMetadata:

KafkaController#startup -> Startup#process -> KafkaController#elect ->KafkaController#onControllerFailover -> ReplicaStateMachine#startup -> #handleStateChanges -> #doHandleStateChanges +  ControllerBrokerRequestBatch#sendRequestsToBrokers -> #sendRequestsToBrokers

简述: 启动的时候, ReplicaStateMachine 在发送完 LeaderAndIsr 的请求, 再继续发送 UpdateMetadata 请求.

总结

仔细观察发现: 其实, 无论有没有topic, 整体的流程是一样的. 只是 ReplicaStateMachine 会处理topic存在的情况.

faq:

  1. 为什么有topic的时候, 会多出来两个请求?

根据阅读的理解, 多处理的两个请求, 只是处理topic-partition的一个行为, 以为只有一个topic、一个partition和一个broker, 所以只有两个请求. 一个是需要通知新副本关于 leader&ISR 信息, 二是 通知每一个broker这个partition的的更新元数据请求

相关的注释内容:

  • OnlineReplica,OfflineReplica -> OnlineReplica
  • –send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the
  • partition to every live broker

核心处理逻辑

private def doHandleStateChanges(replicaId: Int, partitions: Seq[TopicPartition], targetState: ReplicaState,
                                   callbacks: Callbacks): Unit = {
      .......
      case OnlineReplica =>
        validReplicas.foreach { replica =>
          val partition = replica.topicPartition
          replicaState(replica) match {
            case NewReplica =>
              val assignment = controllerContext.partitionReplicaAssignment(partition)
              if (!assignment.contains(replicaId)) {
                controllerContext.updatePartitionReplicaAssignment(partition, assignment :+ replicaId)
              }
            case _ =>
              controllerContext.partitionLeadershipInfo.get(partition) match {
                case Some(leaderIsrAndControllerEpoch) =>
                  controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
                    replica.topicPartition,
                    leaderIsrAndControllerEpoch,
                    controllerContext.partitionReplicaAssignment(partition), isNew = false)
                case None =>
              }
          }
          logSuccessfulTransition(replicaId, partition, replicaState(replica), OnlineReplica)
          replicaState.put(replica, OnlineReplica)
        }
    
}