这篇文章会对 consumer 进行分析.

入口

这里通过org.apache.pulsar.client.tutorial.SampleConsumer#main进行分析.

consumer 实现

通过debug分析可以发现, consumer底层只有 ConsumerImpl、ZeroQueueConsumerImpl、 MultiTopicsConsumerImpl 和 PatternMultiTopicsConsumerImpl 四种. 其中, ConsumerImpl是基础的实现, 其他是基于ConsumerImpl进行的封装和组合处理. consumer和broker之间的协议交互通过 ClientCnx 进行处理, 协议分析参照链接.

这里针对四种consumer实现的差异进行说明:

  • ConsumerImpl
    • 实例化最后, 会调用方法grabCnx, 触发了连接的初始化工作: 创建连接, 发送 CONNECT command消息, 发送SUBSCRIBE command消息, 发送Flow command消息, 最终实现注册和服务端消息的推送.
    • 服务端推送的消息会放到 incomingMessages 队列.
    • 消息通过显式的调用 internalReceive() 从incomingMessages队列中取消息进行消费.
  • ZeroQueueConsumerImpl
    • 继承ConsumerImpl进行实现, 但是重写了方法 canEnqueueMessage() 和 internalReceive() 以及其他方法 (其他的过于琐碎,不进行讨论)
    • 重写 internalReceive(), 每次调用会清空 incomingMessages, 没有及时处理的消息就没有了. 然后主动Flow command 请求一条消息, 等待获取消息进行处理
    • 重写 canEnqueueMessage(), 当 listener 方式处理消息, 直接回调, 那么, 消息就不会丢失
  • MultiTopicsConsumerImpl
    • 多个consumer的集合.
    • 初始化的时候, 就会直接启动消息的接收.
  • PatternMultiTopicsConsumerImpl
    • 继承 MultiTopicsConsumerImpl, 定时监听topic的变化,进行sub/unsub

主要方法分析

subscribe

调用链: ConsumerBuilderImpl#subscribe -> PulsarClientImpl#subscribeAsync -> 分类讨论
  1. pattern模式

    获取namespace下的所有topic, 然后进行过滤, 实例化 PatternMultiTopicsConsumerImpl

  2. 单topic模式

调用链: PulsarClientImpl#singleTopicSubscribeAsync -> #doSingleTopicSubscribeAsync -> ConsumerImpl#newConsumerImpl / MultiTopicsConsumerImpl#createPartitionedConsumer

单topic中, 根据 topic partition > 1 ? MultiTopicsConsumerImpl : ConsumerImpl 进行处理, 在 MultiTopicsConsumerImpl 实现中, 每个 topic partition 都是 一个topic, 会生成一个 ZeroQueueConsumerImpl/ConsumerImpl, 并进行 Flow协议进行消息接收, 当消息到来的时候, 会将消息放到 incomingMessages队列, 等待 receive或者 listener函数异步处理. 而在 ConsumerImpl实现中, 实例化后不做任何处理

  1. 多topic模式

    实例化 MultiTopicsConsumerImpl

receive

通过上面的consumer底层分析, 可以知道, receive 其实是通过 queue 传递给上层获取的. 不做赘述.

总结

不足

  1. ZeroQueueConsumerImpl 实现不够简洁, 继承ConsumerImpl,导致很多多余的逻辑. 尤其是 incomingMessages 队列, 在 ZeroQueueConsumerImpl 是多余的存储