One minute
Kafka Consumer Proto
Debug
- 参照debug文章在intellij启动Kafka
2.terminal 发送请求:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
服务端的交互
通过分析发现交互的请求协议如下
API_VERSIONS METADATA FindCoordinator ApiVersions JoinGroup SyncGroup OffsetFetch ApiVersion ListOffsets Fetch ... Heartbeat ... OffsetCommit ...Fetch...
细节分析
-
API_VERSIONS
返回 broker 是否支持client version
-
METADATA
获取broker信息, 以及相应的partition信息
-
FindCoordinator
获取coordinator的信息
-
ApiVersions
同上
-
JoinGroup
添加到 group 中, 等待运算
-
SyncGroup
client leader 同步运算结果, 并返回
-
OffsetFetch
v0 从zk读, 后面的版本通过 GroupCoordinator#handleFetchOffsets 获取offset
-
ApiVersion
同上
-
ListOffsets
获取Offset(分区的偏移量有效范围, lso和leo)
-
Fetch
ReplicaManager#fetchMessages -> #readFromLocalLog -> Partition#readRecords -> Log#read -> LogSegment#read -> 通过网络层发送文件内容
-
Heartbeat
更新member信息
-
OffsetCommit
v0: 直接写入 zookeeper
后面的版本: GroupCoordinator#handleCommitOffsets -> #doCommitOffsets -> GroupManager#storeOffsets -> #appendForGroup -> ReplicaManager#appendRecords -> #appendToLocalLog -> Partition#appendRecordsToLeader -> Log#appendAsLeader -> #append -> LogSegment#append -> FileRecords#append -> MemoryRecords#writeFullyTo
除了早期的版本, 直接写入zk, 后面的版本, 是直接写入 kafka 的分布式文件上的
faq
- 为什么发送处理那么多次 API_VERSIONS
- OffsetFetch和ListOffsets?
112 Words
2019-03-03 21:40 +0800