3 minutes
Kafka_group_coordinator
preface
因为工作内容涉及 kafka, 今天梳理下 kafka group coordinator
coordinator
group coordinator 是计算出来的, 是hash groupId计算出来取模元数据的partition个数, 代码如 Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
, 然后取这个partition的leader broker 就是这个group的coordinator. 具体在findCoordinator的实现中.
coordinator 的主要任务是: 1. consumer rebalance 2. offset 管理
coordinator rebalance 模块比较简单, 之前客户端开发的时候总结过一波. 如下:
客户端找到一个Node -> find coordinator protocol -> onJoinPrepare(子类) -> join group protocol -> sync group protocol(onJoinLeader 包含了任务分配的结果/follower 空的assignment) -> enable heartbeat ->onJoinComplete(子类处理分配结果). 心跳线程处理 coordinator的网络连接. leader 是 coordinator 选举的.
在group coordinator的kafka server端, 主要处理 join group protocol 和 sync group protocol. 在处理join操作的时候, 一个重点就是维护参加 rebalane 的consumer, 为此, Kafka 抽象了 MemberMetadata 表示 member 的状态, MemberMetadata 维护了 joinCallback 和 syncCallback, 这样join/sync结束的时候可以通过 joinCallback 通知consumer. 使用 GroupMetadata 表示 group 的状态, 记录协议的兼容性、group generation、offset管理 等等 注意: member的 clientId 默认是 clientId/instanceId-uuid; 每次rebalance都会递增 generation
基本概念
触发rebalance的时机:
- 组成员个数发生变化 (加入或者离开)
- Topic 个数发生变化
- Topic 分区数发生变化
如何感知消费者进程挂掉?
- session 过期 (????)
- heartbeat 过期
(ps: 可以作为基本的服务端开发者的概念面试)
消费者如何感知rebalance:
coordinator 在触发rebalance之后, 会进入 PreparingRebalance
的状态, 然后发送 REBALANCE_IN_PROGRESS
响应给客户端的心跳或者sync 触发client重新rebalance
选举
coordinator 选举依赖 __consumers_offsets
topic, __consumers_offsets
是 coordinator 用来存储 consumer acked offset, 通过确定 group offset写入的分区 来确定 coordinator, 分区 leader 就是指定group的 coordinator. 实现参看 kafkaApis#handleFindCoordinatorRequest
. group -> coordinator 的算法如下:
class GroupMetadataManager{
// GROUP_METADATA_TOPIC_NAME 就是 __consumer_offsets
private def getGroupMetadataTopicPartitionCount: Int = {
zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicNumPartitions)
}
/* number of partitions for the consumer metadata topic */
private val groupMetadataTopicPartitionCount = getGroupMetadataTopicPartitionCount
def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
}
consumer 的管理
consumer 的维护是基于心跳实现, 也是基于 DelayedOperation 的实现: DelayedHeartbeat. 触发 DelayedHeartbeat 投递到 延迟队列有很多情况, 常见的有 join、rebalance 结束 和 consumer 每次投递心跳. 前两个是server触发延迟心跳, 后者通过客户端心跳进行周期性投递触发. consumer 发送 heartbeat 的时候, 服务端就会尝试将 之前的 heartbeat tryComplete
, 进而触发 onComplete
.
问题:
特殊的参数 isPending
, unknow join 的时候, kafka group coordinator 会生成一个 memberId, 并将新的memberId下发到consumer, 并要求consumer在心跳超时时间内投递心跳. 在静态成员 这个feature之前, 所有的memberId都是 group coordinator 自动生成的, 并且是不保存的, 这样的设计之下, 导致了 group member 在join的时候, 所有的 member都需要强制加入 (无法做到粘性分配). 在 静态成员 feature之后, 虽然 clientId 还是动态的, 但是可以确保没有超时之前, 静态成员的加入不会触发 rebalance.
补充: 早期的实现中, 心跳和处理流程在一个线程, 导致处理时间太长的场景下心跳失活被踢出group, 引起不必要的rebalance. 详情参看 https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
join&sync 设计
首先回顾下 join 的几个主要设计
早期
早期的join(stop-the-world/Eager/global/dynamic membership)实现, 每次触发rebalance, 就需要所有consumer重新加入, 在sync 结果之前, 是不能消费的. 整体流程是: 延迟join -> leader calc -> sync result. 并且如果是unknow group join, 还需要服务端重新生成 clientId并触发consumer重新加入.
参考: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Detailed+Consumer+Coordinator+Design
对于有状态的应用, 创建一个group并生成多个consumer消费的时候, 会发现有多次rebalance, 因此每个consumer加入group都会触发一次rebalance, 对于有状态的应用 (需要初始化一些资源) 会很重.
DelayJoin的设计: https://cwiki.apache.org/confluence/display/KAFKA/KIP-134%3A+Delay+initial+consumer+group+rebalance
static member
对于有状态的应用, 每次rebalance都是很严重的, 因为涉及到数据迁移, 术语是 state shuffling
, 因此设计了 static member, 通过固化 group.instance.id 减少状态迁移, 上层consumer分配能够将 相同的assignment
分配给 同一个的instance.id. 细节参看
- https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances
- https://www.confluent.io/blog/kafka-rebalance-protocol-static-membership/
注意这个是 group.instance.id, 并不是 clientId, clientId 依旧是动态的, 存在 group.instance.id 的时候, 会拼接 group.instance.id 和 UUID. 另外, 这种机制下, consumer offline 就不会发送 leave group request
, 依赖 group sessionTimeout 触发 group rebalance.
渐进式
因为动态rebalance是全部撤销partition分配在进行重分配的, 这里会导致严重的 stop-the-world 问题. 作为对比, rocketmq 在消费的时候, consumer 只有在计算知道partition不属于自己的时候才会停止消费, 而不是每次rebalance之前就停止所有消费, 一定程度上降低了 stop-the-world 的风险. 为此, kafka 提出了 Incremental Cooperative Rebalancing
的想法, 避免rebalance的时候释放资源, 降低释放-重新获取的开销. 这个需求推测来自 kafka-connect 和 kafka-stream. 但是在 k8s 滚动升级、死亡恢复、垂直扩容/缩容 的场景中还是很重要的. 这个想法其实早在 connect 中就实现了, 参考 https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
.
渐进式最早的文章可以参考: https://cwiki.apache.org/confluence/display/KAFKA/Incremental+Cooperative+Rebalancing%3A+Support+and+Policies
, 最早在connect相关的kip是 https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
, kafka consumer相关的kip是 https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol
.
渐进式方案中, 其实有两种实现, 一种是 kafka connect, 一种是 kafka consumer. consumer 和connect 在实现上不一样, 因为并不需要引入 scheduledDelay
. 整体实现上并不需要broker参与, 主要是 consumer 实现, 需要修改默认行为·协议以及配合sticky assigner:CooperativeStickyAssignor
coding
join/sync的处理主体逻辑: GroupCoordinator#handleJoinGroup/handleSyncGroup, join因为存在一个延迟特性, 和心跳类似, 也是一个 DelayedOperation
的设计, 每次consumer join都会尝试结束 group 的 rebalance.
offset 提交
kafka 0.9 以及之后的版本, 都是用 kafka compacted log 存储日志, topic 是 __consumer_offsets
. 基于 replicaManager 实现, 参考 GroupMetadataManager#appendForGroup.
其他设计
rebalance generation 设计, 隔离无效offset提交的。比如上一届的consumer成员是无法提交位移到新一届的consumer group中
sessionTimeout kafka规定了 min 和max, sessionTimeout 需要在这两个之间, 就是心跳的超时时间 rebalanceTimeout
group 在 kafka 的几种状态:
Dead:组内已经没有任何成员的最终状态,组的元数据也已经被coordinator移除了。这种状态响应各种请求都是一个response:UNKNOWN_MEMBER_ID
Empty:组内无成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求
PreparingRebalance:组准备开启新的rebalance,等待成员加入
CompletingRebalance:正在等待leader consumer将分配方案传给各个成员
Stable:rebalance完成!可以开始消费了