One minute
Pulsar Cleint Consumer
这篇文章会对 consumer 进行分析.
入口
这里通过org.apache.pulsar.client.tutorial.SampleConsumer#main进行分析.
consumer 实现
通过debug分析可以发现, consumer底层只有 ConsumerImpl、ZeroQueueConsumerImpl、 MultiTopicsConsumerImpl 和 PatternMultiTopicsConsumerImpl 四种. 其中, ConsumerImpl是基础的实现, 其他是基于ConsumerImpl进行的封装和组合处理. consumer和broker之间的协议交互通过 ClientCnx 进行处理, 协议分析参照链接.
这里针对四种consumer实现的差异进行说明:
- ConsumerImpl
- 实例化最后, 会调用方法grabCnx, 触发了连接的初始化工作: 创建连接, 发送 CONNECT command消息, 发送SUBSCRIBE command消息, 发送Flow command消息, 最终实现注册和服务端消息的推送.
- 服务端推送的消息会放到 incomingMessages 队列.
- 消息通过显式的调用 internalReceive() 从incomingMessages队列中取消息进行消费.
- ZeroQueueConsumerImpl
- 继承ConsumerImpl进行实现, 但是重写了方法 canEnqueueMessage() 和 internalReceive() 以及其他方法 (其他的过于琐碎,不进行讨论)
- 重写 internalReceive(), 每次调用会清空 incomingMessages, 没有及时处理的消息就没有了. 然后主动Flow command 请求一条消息, 等待获取消息进行处理
- 重写 canEnqueueMessage(), 当 listener 方式处理消息, 直接回调, 那么, 消息就不会丢失
- MultiTopicsConsumerImpl
- 多个consumer的集合.
- 初始化的时候, 就会直接启动消息的接收.
- PatternMultiTopicsConsumerImpl
- 继承 MultiTopicsConsumerImpl, 定时监听topic的变化,进行sub/unsub
主要方法分析
subscribe
调用链: ConsumerBuilderImpl#subscribe -> PulsarClientImpl#subscribeAsync -> 分类讨论
-
pattern模式
获取namespace下的所有topic, 然后进行过滤, 实例化 PatternMultiTopicsConsumerImpl
-
单topic模式
调用链: PulsarClientImpl#singleTopicSubscribeAsync -> #doSingleTopicSubscribeAsync -> ConsumerImpl#newConsumerImpl / MultiTopicsConsumerImpl#createPartitionedConsumer
单topic中, 根据 topic partition > 1 ? MultiTopicsConsumerImpl : ConsumerImpl 进行处理, 在 MultiTopicsConsumerImpl 实现中, 每个 topic partition 都是 一个topic, 会生成一个 ZeroQueueConsumerImpl/ConsumerImpl, 并进行 Flow协议进行消息接收, 当消息到来的时候, 会将消息放到 incomingMessages队列, 等待 receive或者 listener函数异步处理. 而在 ConsumerImpl实现中, 实例化后不做任何处理
-
多topic模式
实例化 MultiTopicsConsumerImpl
receive
通过上面的consumer底层分析, 可以知道, receive 其实是通过 queue 传递给上层获取的. 不做赘述.
总结
- 无
不足
- ZeroQueueConsumerImpl 实现不够简洁, 继承ConsumerImpl,导致很多多余的逻辑. 尤其是 incomingMessages 队列, 在 ZeroQueueConsumerImpl 是多余的存储
151 Words
2019-03-03 09:59 +0800