Debug

1.环境准备: 在 org.apache.pulsar.common.api.PulsarDecoder#channelRead 中对每一个 case 打点, 启动debug.

2.terminal req:

  • 先发送produce请求, 积累数据, 同时避免 consumer污染debug流程 bin/pulsar-client produce my-topic –messages “hello-pulsar1” bin/pulsar-client produce my-topic –messages “hello-pulsar2”
  • 发送consume请求, 关注调试 bin/pulsar-client consume my-topic -t Shared -s demo-sub2 -n 0

服务端的交互

经过debug, 发现consumer主要通过一下几个流程:

CONNECT  PARTITIONED_METADATA  LOOKUP CONNECT SUBSCRIBE FLOW ACK CLOASE_CONSUMER

通过代码, 进行分析如下: (ps: 箭头表示调用关系)

1.CONNECT: lookup服务连接, 进行权限校验

2.PARTITIONED_METADATA: 获取topic的partition数量

3.CONNECT: consumer连接, 进行权限校验

4.LOOKUP: 获取topicName的broker地址

5.SUBSCRIBE: 订阅主题

ServerCnx#handleSubscribe -> PersistentTopic#subscribe -> PersistentSubscription#addConsumer -> PersistentDispatcherMultipleConsumers#addConsumer 

6.FLOW: 消费消息

ServerCnx#handleFlow -> Consumer#flowPermits -> PersistentSubscription#consumerFlow -> PersistentDispatcherMultipleConsumers#consumerFlow...readMoreEntries -> ManagedCursorImpl#asyncReadEntriesOrWait...#asyncReadEntries -> ManagedLedgerImpl#asyncReadEntries... ->  EntryCacheImpl#asyncReadEntry -> bookkeeper#ReadHandle.. -> 层层callback -> OpReadEntry#readEntriesComplete...#checkReadCompletion ->PersistentDispatcherMultipleConsumers#readEntriesComplete-> Consumer#sendMessages

7.ACK: ack消息

Consumer.messageAcked  -> PersistentSubscription.acknowledgeMessage -> ManagedCursorImpl.asyncDelete -> LegerHandle.asyncAddEntry 

8.CLOASE_CONSUMER: 关闭consumer

handleCloseConsumer -> 从consumers中删除, Consumer.close -> PersistentSubscription.removeConsumer -> XXXDispatcher.removeConsumer 

细节分析

从pulsar的订阅模式中得知, 订阅模式有 exclusive, shared, and failover三种. 上面的流程主要针对 shared mode的流程分析, 不同订阅模式的区别在于4、5、6,这里统一展开:

  1. exclusive 只允许一个Consumer订阅, 使用 ActiveConsumer 进行存储, 分发的时候 也只分发给ActiveConsumer. 参照类 org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.
  2. failover 允许多个Consumer订阅, 但是只有一个 Active Consumer, 整体处理逻辑和 exclusive mode类似, 使用的也是同一个类: org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer. 但是多了一个 active consumer 的选择逻辑: 参照类 AbstractDispatcherSingleActiveConsumer#pickAndScheduleActiveConsumer
protected boolean pickAndScheduleActiveConsumer() {
        ....
        int index = partitionIndex % consumers.size(); 
        Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index));

        Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
        ....
}

如果只是单独的topic, 进行 failover mode, 那么, index只可能等于0, 这段代码多余, 但是如果是 partitioned topic consumer, 那么, consumer数量 < partition index, consumer 不断增加, 会导致 active consumer 不断变化, 实现中有延迟通知active consumer的实现. 3. shared 允许多个Consumer订阅, topic消息不顺序,那么, 消息是怎么分配到 consumer上的呢? 当消息需要分派的时候, 会调用方法 getNextConsumer, 如下: PersistentDispatcherMultipleConsumers#readEntriesComplete

@Override
public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
....
while (entriesToDispatch > 0 && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
            Consumer c = getNextConsumer();
            ....
            if (messagesForC > 0) {
                ....
                SendMessageInfo sentMsgInfo = c.sendMessages(entries.subList(start, start + messagesForC));
                ....
            }
            ....
}
....  
}

faq

  1. 什么时候会触发rebalance逻辑? rebalance逻辑是怎么实现的?
  • broker socket关闭的时候, 会触发 producer/consumer 都关闭.
  • 心跳超时, 会将consumer进行剔除, failover 会触发 consumer选举, shared模式也是.
  1. 为什么 CONNECT 两次?
    • 答案和之前一样, consumer先和配置文件中brokerUrl进行连接, 获取到了topic的地址后, 连接到topic owner broker, 如果订阅了多个topic, 就会有相应的多个CONNECT 和SUBSCRIBE协议请求。