Debug

  1. 参照debug文章在intellij启动Kafka

2.terminal 发送请求:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

服务端的交互

通过分析发现交互的请求协议如下

API_VERSIONS METADATA FindCoordinator ApiVersions JoinGroup SyncGroup OffsetFetch ApiVersion ListOffsets Fetch ... Heartbeat ... OffsetCommit ...Fetch...

细节分析

  1. API_VERSIONS

    返回 broker 是否支持client version

  2. METADATA

    获取broker信息, 以及相应的partition信息

  3. FindCoordinator

    获取coordinator的信息

  4. ApiVersions

    同上

  5. JoinGroup

    添加到 group 中, 等待运算

  6. SyncGroup

    client leader 同步运算结果, 并返回

  7. OffsetFetch

    v0 从zk读, 后面的版本通过 GroupCoordinator#handleFetchOffsets 获取offset

  8. ApiVersion

    同上

  9. ListOffsets

    获取Offset(分区的偏移量有效范围, lso和leo)

  10. Fetch

    ReplicaManager#fetchMessages ->  #readFromLocalLog -> Partition#readRecords ->  Log#read -> LogSegment#read -> 通过网络层发送文件内容
  1. Heartbeat

    更新member信息

  2. OffsetCommit

    v0: 直接写入 zookeeper 
    后面的版本: GroupCoordinator#handleCommitOffsets -> #doCommitOffsets -> GroupManager#storeOffsets -> #appendForGroup -> ReplicaManager#appendRecords -> #appendToLocalLog -> Partition#appendRecordsToLeader -> Log#appendAsLeader -> #append -> LogSegment#append -> FileRecords#append -> MemoryRecords#writeFullyTo 

除了早期的版本, 直接写入zk, 后面的版本, 是直接写入 kafka 的分布式文件上的

faq

  1. 为什么发送处理那么多次 API_VERSIONS
  2. OffsetFetch和ListOffsets?