One minute
Rocketmq Broker Consume
流程分析-消息接收:
consumer 收消息
MQConsumerInner有DefaultMQPushConsumerImpl&DefaultMQPullConsumerImpl两种模式, 同时区分有序性和并发两种, 这里先看 push&并发的实现:
DefaultMQPushConsumer#start -> DefaultMQPushConsumerImpl#start -> MQClientInstance#start -> RebalanceService#start -> #run -> MQClientInstance#doRebalance -> RebalanceImpl#doRebalance ...#updateProcessQueueTableInRebalance -> RebalancePushImpl#dispatchPullRequest -> DefaultMQPushConsumerImpl#executePullRequestImmediately -> PullMessageService#executePullRequestImmediately -> #run -> #pullMessage -> DefaultMQPushConsumerImpl#pullMessage -> PullAPIWrapper#pullKernelImpl -> MQClientAPIImpl#pullMessage -> ConsumeMessageConcurrentlyService#submitConsumeRequest -> ConsumeRequest#run -> MessageListenerConcurrently#consumeMessage
总结:
- push模式中, 通过rebalance定期触发消费消息
- pull模式中, 需要手动拉取.
- 无论是push模式还是pull模式, 最终都是调用 PullAPIWrapper#pullKernelImpl 实现的
broker 接受消息
PullMessageProcessor:
PullMessageProcessor#processRequest -> DefaultMessageStore#getMessage -> ConsumeQueue.getIndexBuffer + CommitLog#getMessage
逻辑: 先从ConsumeQueue查找索引内容, 再到 CommitLog 中读取文件内容.
特点:
-
zeroCopy的支持:
消费消息,下发消息的时候: 通过继承Netty FileRegion的实现 ManyMessageTransfer, 最终实现 文件传输的zerocopy 传输, 和kafka类似, 也支持 []byte 的拷贝, rocketmq 通过参数配置
-
阻塞通知机制
当consumer消费消息的时候, 没有消息存在的情况下, broker 会阻塞起调用, 当consume queue有消息的时候, 在通知consumer. 超时时间 5000 ms.
faq: 如何确定消息关系
实现接口 AllocateMessageQueueStrategy, 默认实现有
- AllocateMachineRoomNearby、
- AllocateMessageQueueAveragely、
- AllocateMessageQueueAveragelyByCircle、
- AllocateMessageQueueByConfig、
- AllocateMessageQueueByMachineRoom、
- AllocateMessageQueueConsistentHash,
- 默认是 AllocateMessageQueueAveragely, 注意, 只有Clustering模式才需要负载均衡, 广播模式中 每个consumer订阅所有的 queue.
faq: rebalance
RebalanceService定期执行, 调用路径如下
RebalanceService#run -> MQClientInstance#doRebalance -> MQConsumerInner#doRebalance -> RebalanceImpl#doRebalance...#rebalanceByTopic...#updateProcessQueueTableInRebalance...#messageQueueChanged
faq: offset管理
consumer offset 在broker中是存储在 本地磁盘上的, 并且只保存 每个queue最新的offset. 保存的操作不是实时的, 是通过定时任务执行的. 具体实现可以参看 BrokerController#initialize.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
简述:
- rebalance通过consumer端定期调用执行的.
- rebalance过程中, 会重新进行 queue分配, 根据比较和上次分配结果的差异, 判断是否需要处理
- 在 pull 模式中, 当发生 rebalance 的时候, 会取消不存在queue的PullTaskImpl, 同时对新的queue,添加PullTaskImpl, 参看类 MQPullConsumerScheduleService
- 在 push模式中, 触发 PullMessageService 立即执行 新分配的的queue的PullRequest
特殊的地方
CONSUME_FROM_LAST_OFFSET 如果偏移量0还在磁盘中, 是从头开始消费的, 而不是最新. 参考: https://juejin.im/post/5d72724cf265da03be48fd24
193 Words
2019-03-13 23:11 +0800