最近学习 kafka-connect 的设计和实现. 其中设计到 group member protocol 的内容. 这里展开学习.

kafka group member 协议

主要参考 AbstractCoordinator 的实现流程 以及 ConsumerCoordinator 的实现.

整体生命流程:

找到一个Node -> find coordinator protocol -> onJoinPrepare(子类) -> join group protocol -> sync group protocol(onJoinLeader 包含了任务分配的结果/follower 空的assignment) -> enable heartbeat ->onJoinComplete(子类处理分配结果). 心跳线程处理 coordinator的网络连接. leader 是 coordinator 选举的. 

ConsumerCoordinator子类

从上面的流程中可以知道, 继承 AbstractCoordinator 的子类, 需要实现 onJoinPrepare、metadata、onLeavePrepare、performAssignment、onJoinComplete

  • onJoinPrepare: 在 eager 模式下, 上次分配的内容全部 revoked; 在 COOPERATIVE 模式下, 只撤回不在定于的 topic 的 partition.
  • metadata: sendJoinGroupRequest使用的数据信息, 用于后面的分配. 会提交上次分配的 assignemnt.
  • performAssignment: 执行分配
  • onJoinComplete: 回调assigner处理分配结果

学习

  1. 注意, 在配置的时候, 每个source集群需要配置一个不一样的 groupId. kafkaConsumer 支持 正则表达式订阅, 并且动态更新元数据, 保证及时订阅到 新增的 topic.

  2. poll 调用的时候 会执行 join group 流程. 通过不断调用 cleints.poll 就可以保证自己在 group 里面. 通过配置 max.poll.interval.ms 避免假死过程中持续的心跳导致 partition 持有问题的活锁.

partition < consumers 会发生什么呢?