One minute
Pulsar Broker Producer Proto
Debug
1.环境准备: 参照 之前的方式
2.terminal req:
- 先发送produce请求 bin/pulsar-client produce my-topic –messages “hello-pulsar1” bin/pulsar-client produce my-topic –messages “hello-pulsar2”
服务端的交互
通过之前的调试方法, producer 和 broker 之间的交互协议:
CONNECT PARTITIONED_METADATA LOOKUP CONNECT PRODUCER SEND CLOSE_PRODUCER
-
Connect: lookup服务 进行权限校验
-
PARTITIONED_METADATA: lookup服务获取topic的partition数量, zk查找 /admin/partitioned-topics/namespace/domain/topicName.
-
LOOKUP: 获取topicName的broker地址
-
CONNECT: producer连接 进行权限校验
-
PRODUCER: 检查topic的权限
-
SEND: 发送消息
Producer#publishMessage -> PersistentTopic#publishMessage -> ManagedLedgerImpl#asyncAddEntry -> OpAddEntry#initiate -> LedgerHandle#asyncAddEntry[bookkeeper] -> OpAddEntry#addComplete -> EntryCache#insert + PersistentTopic#addComplete[flush response] + ManagedLedgerImpl#notifyCursors[唤醒等待的读] -> ManagedCursorImpl#notifyEntriesAvailable -> ManagedLedgerImpl#asyncReadEntries -> 参照consumer
-
CLOSE_PRODUCER: 通知broker关闭该producer
faq:
- 为什么执行Connect两次
- 前三次的客户端协议: Connect PARTITIONED_METADATA LOOKUP 是和 client.conf 中填写的brokerServiceUrl的地址进行交互的, brokerServiceUrl充当了类似 管理服务器.
- 后面三次的客户端协议, 是和topic的owner broker进行交互的. 这里因为是没有区分partition的producer, 所以只有一个CONNECT 和 PRODUCER, 不然, 有几个partition就有几个 CONNECT 和 PRODUCER. 具体的参照: ClientCnx#channelActive 和 ProducerImpl#connectionOpened
99 Words
2019-03-03 18:35 +0800