2 minutes
Kafka Proto
debug方式启动服务端后, 通过在 KafkaApis#handle 设置断点, 可以发现启动过程中, broker 接受了三个请求:
UpdateMetadata -> LeaderAndIsr -> UpdateMetadata
但是, 后来通过调试分析发现, 只有当存在topic的时候, broker才会接收到三个请求, 没有topic的情况下, 其实, broker只会接受到一个请求.
没有topic的处理
没有topic的情况下, Kafka broker 只会接受到一个请求: UpdateMetadata. 执行流程如下:
KafkaController#startup -> Startup#process -> KafkaController#elect -> #onControllerFailover -> #sendUpdateMetadataRequest -> brokerRequestBatch#addUpdateMetadataRequestForBrokers
描述: Kafka Controller 启动后, 会竞选 leader, 成功后会立即触发发送一次 UpdateMetadata.
broker接受到请求后, 处理流程:
- 如果有删除的partition, 删除partition
- 完成topic的延时任务
有topic的处理
有topic的情况下, Kafka broker 会接受到三个请求: UpdateMetadata、LeaderAndIsr、UpdateMetadata. 通过debug, 执行流程如下:
-
UpdateMetadata:
同上. KafkaController 竞选leader成功后发送请求给Broker
-
LeaderAndIsr:
启动的时候, ReplicaStateMachine 发送 LeaderAndIsr 的请求给Broker.
-
UpdateMetadata:
KafkaController#startup -> Startup#process -> KafkaController#elect ->KafkaController#onControllerFailover -> ReplicaStateMachine#startup -> #handleStateChanges -> #doHandleStateChanges + ControllerBrokerRequestBatch#sendRequestsToBrokers -> #sendRequestsToBrokers
简述: 启动的时候, ReplicaStateMachine 在发送完 LeaderAndIsr 的请求, 再继续发送 UpdateMetadata 请求.
总结
仔细观察发现: 其实, 无论有没有topic, 整体的流程是一样的. 只是 ReplicaStateMachine 会处理topic存在的情况.
faq:
- 为什么有topic的时候, 会多出来两个请求?
根据阅读的理解, 多处理的两个请求, 只是处理topic-partition的一个行为, 以为只有一个topic、一个partition和一个broker, 所以只有两个请求. 一个是需要通知新副本关于 leader&ISR 信息, 二是 通知每一个broker这个partition的的更新元数据请求
相关的注释内容:
- OnlineReplica,OfflineReplica -> OnlineReplica
- –send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the
- partition to every live broker
核心处理逻辑
private def doHandleStateChanges(replicaId: Int, partitions: Seq[TopicPartition], targetState: ReplicaState,
callbacks: Callbacks): Unit = {
.......
case OnlineReplica =>
validReplicas.foreach { replica =>
val partition = replica.topicPartition
replicaState(replica) match {
case NewReplica =>
val assignment = controllerContext.partitionReplicaAssignment(partition)
if (!assignment.contains(replicaId)) {
controllerContext.updatePartitionReplicaAssignment(partition, assignment :+ replicaId)
}
case _ =>
controllerContext.partitionLeadershipInfo.get(partition) match {
case Some(leaderIsrAndControllerEpoch) =>
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
replica.topicPartition,
leaderIsrAndControllerEpoch,
controllerContext.partitionReplicaAssignment(partition), isNew = false)
case None =>
}
}
logSuccessfulTransition(replicaId, partition, replicaState(replica), OnlineReplica)
replicaState.put(replica, OnlineReplica)
}
}
214 Words
2019-03-09 21:56 +0800