preface

因为工作内容涉及 kafka, 今天梳理下 kafka group coordinator

coordinator

coordinator 的主要任务是: 1. consumer rebalance 2. offset 管理

coordinator rebalance 模块比较简单, 之前客户端开发的时候总结过一波. 如下:

客户端找到一个Node -> find coordinator protocol -> onJoinPrepare(子类) -> join group protocol -> sync group protocol(onJoinLeader 包含了任务分配的结果/follower 空的assignment) -> enable heartbeat ->onJoinComplete(子类处理分配结果). 心跳线程处理 coordinator的网络连接. leader 是 coordinator 选举的. 

在group coordinator的kafka server端, 主要处理 join group protocol 和 sync group protocol. 在处理join操作的时候, 一个重点就是维护参加 rebalane 的consumer, 为此, Kafka 抽象了 MemberMetadata 表示 member 的状态, MemberMetadata 维护了 joinCallback 和 syncCallback, 这样join/sync结束的时候可以通过 joinCallback 通知consumer. 使用 GroupMetadata 表示 group 的状态, 记录协议的兼容性、group generation、offset管理 等等 注意: member的 clientId 默认是 clientId/instanceId-uuid; 每次rebalance都会递增 generation

基本概念

触发rebalance的时机:

  • 组成员个数发生变化 (加入或者离开)
  • Topic 个数发生变化
  • Topic 分区数发生变化

如何感知消费者进程挂掉?

  • session 过期 (????)
  • heartbeat 过期

(ps: 可以作为基本的服务端开发者的概念面试)

消费者如何感知rebalance:

coordinator 在触发rebalance之后, 会进入 PreparingRebalance 的状态, 然后发送 REBALANCE_IN_PROGRESS 响应给客户端的心跳或者sync 触发client重新rebalance

选举

coordinator 选举依赖 __consumers_offsets topic, __consumers_offsets 是 coordinator 用来存储 consumer acked offset, 通过确定 group offset写入的分区 来确定 coordinator, 分区 leader 就是指定group的 coordinator. 实现参看 kafkaApis#handleFindCoordinatorRequest. group -> coordinator 的算法如下:

class GroupMetadataManager{

  // GROUP_METADATA_TOPIC_NAME 就是 __consumer_offsets
  private def getGroupMetadataTopicPartitionCount: Int = {
    zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicNumPartitions)
  }

  /* number of partitions for the consumer metadata topic */
  private val groupMetadataTopicPartitionCount = getGroupMetadataTopicPartitionCount

  def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount	
}  

consumer 的管理

consumer 的维护是基于心跳实现, 也是基于 DelayedOperation 的实现: DelayedHeartbeat. 触发 DelayedHeartbeat 投递到 延迟队列有很多情况, 常见的有 join、rebalance 结束 和 consumer 每次投递心跳. 前两个是server触发延迟心跳, 后者通过客户端心跳进行周期性投递触发. consumer 发送 heartbeat 的时候, 服务端就会尝试将 之前的 heartbeat tryComplete, 进而触发 onComplete.

问题: 特殊的参数 isPending, unknow join 的时候, kafka group coordinator 会生成一个 memberId, 并将新的memberId下发到consumer, 并要求consumer在心跳超时时间内投递心跳. 在静态成员 这个feature之前, 所有的memberId都是 group coordinator 自动生成的, 并且是不保存的, 这样的设计之下, 导致了 group member 在join的时候, 所有的 member都需要强制加入 (无法做到粘性分配). 在 静态成员 feature之后, 虽然 clientId 还是动态的, 但是可以确保没有超时之前, 静态成员的加入不会触发 rebalance.

补充: 早期的实现中, 心跳和处理流程在一个线程, 导致处理时间太长的场景下心跳失活被踢出group, 引起不必要的rebalance. 详情参看 https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread

join&sync 设计

首先回顾下 join 的几个主要设计

早期

早期的join(stop-the-world/Eager/global/dynamic membership)实现, 每次触发rebalance, 就需要所有consumer重新加入, 在sync 结果之前, 是不能消费的. 整体流程是: 延迟join -> leader calc -> sync result. 并且如果是unknow group join, 还需要服务端重新生成 clientId并触发consumer重新加入.

参考: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Detailed+Consumer+Coordinator+Design

对于有状态的应用, 创建一个group并生成多个consumer消费的时候, 会发现有多次rebalance, 因此每个consumer加入group都会触发一次rebalance, 对于有状态的应用 (需要初始化一些资源) 会很重.

DelayJoin的设计: https://cwiki.apache.org/confluence/display/KAFKA/KIP-134%3A+Delay+initial+consumer+group+rebalance

static member

对于有状态的应用, 每次rebalance都是很严重的, 因为涉及到数据迁移, 术语是 state shuffling, 因此设计了 static member, 通过固化 group.instance.id 减少状态迁移, 上层consumer分配能够将 相同的assignment 分配给 同一个的instance.id. 细节参看

注意这个是 group.instance.id, 并不是 clientId, clientId 依旧是动态的, 存在 group.instance.id 的时候, 会拼接 group.instance.id 和 UUID. 另外, 这种机制下, consumer offline 就不会发送 leave group request, 依赖 group sessionTimeout 触发 group rebalance.

渐进式

因为动态rebalance是全部撤销partition分配在进行重分配的, 这里会导致严重的 stop-the-world 问题. 作为对比, rocketmq 在消费的时候, consumer 只有在计算知道partition不属于自己的时候才会停止消费, 而不是每次rebalance之前就停止所有消费, 一定程度上降低了 stop-the-world 的风险. 为此, kafka 提出了 Incremental Cooperative Rebalancing 的想法, 避免rebalance的时候释放资源, 降低释放-重新获取的开销. 这个需求推测来自 kafka-connect 和 kafka-stream. 但是在 k8s 滚动升级、死亡恢复、垂直扩容/缩容 的场景中还是很重要的. 这个想法其实早在 connect 中就实现了, 参考 https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/.

渐进式最早的文章可以参考: https://cwiki.apache.org/confluence/display/KAFKA/Incremental+Cooperative+Rebalancing%3A+Support+and+Policies, 最早在connect相关的kip是 https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect, kafka consumer相关的kip是 https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol.

渐进式方案中, 其实有两种实现, 一种是 kafka connect, 一种是 kafka consumer. consumer 和connect 在实现上不一样, 因为并不需要引入 scheduledDelay. 整体实现上并不需要broker参与, 主要是 consumer 实现, 需要修改默认行为·协议以及配合sticky assigner:CooperativeStickyAssignor

coding

join/sync的处理主体逻辑: GroupCoordinator#handleJoinGroup/handleSyncGroup, join因为存在一个延迟特性, 和心跳类似, 也是一个 DelayedOperation的设计, 每次consumer join都会尝试结束 group 的 rebalance.

offset 提交

kafka 0.9 以及之后的版本, 都是用 kafka compacted log 存储日志, topic 是 __consumer_offsets. 基于 replicaManager 实现, 参考 GroupMetadataManager#appendForGroup.

其他设计

rebalance generation 设计, 隔离无效offset提交的。比如上一届的consumer成员是无法提交位移到新一届的consumer group中

sessionTimeout kafka规定了 min 和max, sessionTimeout 需要在这两个之间, 就是心跳的超时时间 rebalanceTimeout

group 在 kafka 的几种状态:

Dead:组内已经没有任何成员的最终状态,组的元数据也已经被coordinator移除了。这种状态响应各种请求都是一个response:UNKNOWN_MEMBER_ID
Empty:组内无成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求
PreparingRebalance:组准备开启新的rebalance,等待成员加入
CompletingRebalance:正在等待leader consumer将分配方案传给各个成员
Stable:rebalance完成!可以开始消费了

参考: