2 minutes
Pulsar Broker Consumer Proto
Debug
1.环境准备: 在 org.apache.pulsar.common.api.PulsarDecoder#channelRead 中对每一个 case 打点, 启动debug.
2.terminal req:
- 先发送produce请求, 积累数据, 同时避免 consumer污染debug流程 bin/pulsar-client produce my-topic –messages “hello-pulsar1” bin/pulsar-client produce my-topic –messages “hello-pulsar2”
- 发送consume请求, 关注调试 bin/pulsar-client consume my-topic -t Shared -s demo-sub2 -n 0
服务端的交互
经过debug, 发现consumer主要通过一下几个流程:
CONNECT PARTITIONED_METADATA LOOKUP CONNECT SUBSCRIBE FLOW ACK CLOASE_CONSUMER
通过代码, 进行分析如下: (ps: 箭头表示调用关系)
1.CONNECT: lookup服务连接, 进行权限校验
2.PARTITIONED_METADATA: 获取topic的partition数量
3.CONNECT: consumer连接, 进行权限校验
4.LOOKUP: 获取topicName的broker地址
5.SUBSCRIBE: 订阅主题
ServerCnx#handleSubscribe -> PersistentTopic#subscribe -> PersistentSubscription#addConsumer -> PersistentDispatcherMultipleConsumers#addConsumer
6.FLOW: 消费消息
ServerCnx#handleFlow -> Consumer#flowPermits -> PersistentSubscription#consumerFlow -> PersistentDispatcherMultipleConsumers#consumerFlow...readMoreEntries -> ManagedCursorImpl#asyncReadEntriesOrWait...#asyncReadEntries -> ManagedLedgerImpl#asyncReadEntries... -> EntryCacheImpl#asyncReadEntry -> bookkeeper#ReadHandle.. -> 层层callback -> OpReadEntry#readEntriesComplete...#checkReadCompletion ->PersistentDispatcherMultipleConsumers#readEntriesComplete-> Consumer#sendMessages
7.ACK: ack消息
Consumer.messageAcked -> PersistentSubscription.acknowledgeMessage -> ManagedCursorImpl.asyncDelete -> LegerHandle.asyncAddEntry
8.CLOASE_CONSUMER: 关闭consumer
handleCloseConsumer -> 从consumers中删除, Consumer.close -> PersistentSubscription.removeConsumer -> XXXDispatcher.removeConsumer
细节分析
从pulsar的订阅模式中得知, 订阅模式有 exclusive, shared, and failover三种. 上面的流程主要针对 shared mode的流程分析, 不同订阅模式的区别在于4、5、6,这里统一展开:
- exclusive 只允许一个Consumer订阅, 使用 ActiveConsumer 进行存储, 分发的时候 也只分发给ActiveConsumer. 参照类 org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.
- failover 允许多个Consumer订阅, 但是只有一个 Active Consumer, 整体处理逻辑和 exclusive mode类似, 使用的也是同一个类: org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer. 但是多了一个 active consumer 的选择逻辑: 参照类 AbstractDispatcherSingleActiveConsumer#pickAndScheduleActiveConsumer
protected boolean pickAndScheduleActiveConsumer() {
....
int index = partitionIndex % consumers.size();
Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index));
Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
....
}
如果只是单独的topic, 进行 failover mode, 那么, index只可能等于0, 这段代码多余, 但是如果是 partitioned topic consumer, 那么, consumer数量 < partition index, consumer 不断增加, 会导致 active consumer 不断变化, 实现中有延迟通知active consumer的实现. 3. shared 允许多个Consumer订阅, topic消息不顺序,那么, 消息是怎么分配到 consumer上的呢? 当消息需要分派的时候, 会调用方法 getNextConsumer, 如下: PersistentDispatcherMultipleConsumers#readEntriesComplete
@Override
public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
....
while (entriesToDispatch > 0 && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
Consumer c = getNextConsumer();
....
if (messagesForC > 0) {
....
SendMessageInfo sentMsgInfo = c.sendMessages(entries.subList(start, start + messagesForC));
....
}
....
}
....
}
faq
- 什么时候会触发rebalance逻辑? rebalance逻辑是怎么实现的?
- broker socket关闭的时候, 会触发 producer/consumer 都关闭.
- 心跳超时, 会将consumer进行剔除, failover 会触发 consumer选举, shared模式也是.
- 为什么 CONNECT 两次?
- 答案和之前一样, consumer先和配置文件中brokerUrl进行连接, 获取到了topic的地址后, 连接到topic owner broker, 如果订阅了多个topic, 就会有相应的多个CONNECT 和SUBSCRIBE协议请求。
275 Words
2019-03-02 21:40 +0800