One minute
Kafka Group Kip
这里主要讨论 kafka group 相关的协议: rebalance, partition 等
https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner
https://cwiki.apache.org/confluence/display/KAFKA/KIP-379%3A+Multiple+Consumer+Group+Management
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89070828
在 group非常大的时候, rebalance 次数就会增加; rebalance 时间取决于最慢的consumer, group 越大, 慢consumer出现的概率就越大. 除此之外, group coordinator 可能多个 group 共享的, 所以彼此会影响.
这个提案中, 提出了 `consumer.group.max.size` 的概念, 对 server端进行了保护. 当有超过数量的member加入, 将会收到 异常.
之前都是 broker 在 收到 joinGroup request 的时候, 返回 uuid 给 client 作为 member.id, 在边缘case中(client不断重启加入), 可能导致内存膨胀. 这个 proposal 中, 就是需要用户手动提交 memebr.id
为了避免rebalance导致 有状态的应用程序的数据迁移.
目前的状态 broker group status: Running -> member JoinGroupRequest -> broker group status: PREPARE_REBALANCE -> broker group status: COMPLETING_REBALANCE -> sync group request (group members) -> SyncGroupResponse (broker send to memebrs)
其中, 第一个加入的 member 就是 group leader.
这个方案的实现, 是持久化 member.id(需要用户主动配置`group.instance.id`), 这样, 每次client 重启就不会重新生成 member.id. 在执行 assignmemt的时候, 可以根据 member.id 执行相同的分配. 这个就是 'static membership'. 没有配置 `group.instance.id` 的情况下, 则是 动态的, member.id 由 coordinator 分配.
membership rebalance protocol 被触发的情况:
1. 新的 member join
2. leader rejoin
3. existing member offline
4. member leave group(leave group request)
比较有意思的是, 是添加了 `group.instance.id`, 和 member.id 一起用在了 Join/Sync/Heartbeat/OffsetCommit request/responses. member.id 是 broker 生层的递增的数字.
在proposal中, 列举了 滚动升级中 range assigner 的列子, 如果是 dynamic membership, 就会引起迁移; 如果是 static membership, 就不会.
使用了 static membership, 只有 session timeout 才会在 broker 中移除 client. 之前在 COMPLETING_REBALANCE 阶段就会移除。
为了维护 `group.instance.id` 的分配关系, 会将映射存储在 “_consumer_offsets” 的 topic 里面
GroupCoordinator 在收到 加入 新的或者empty group, group 状态会进入 InitialRebalance, 并延迟 min(rebalanceTimeout, group.initial.rebalance.delay.ms) 等待其他的 member加入, 如果有member加入, 那么 延迟就会重置. 过期后就会进入 PreparingRebalance.
0.11.0 中发布的功能
212 Words
2019-11-07 09:54 +0800