流程分析-消息接收:

consumer 收消息

MQConsumerInner有DefaultMQPushConsumerImpl&DefaultMQPullConsumerImpl两种模式, 同时区分有序性和并发两种, 这里先看 push&并发的实现:

DefaultMQPushConsumer#start -> DefaultMQPushConsumerImpl#start -> MQClientInstance#start -> RebalanceService#start -> #run -> MQClientInstance#doRebalance -> RebalanceImpl#doRebalance  ...#updateProcessQueueTableInRebalance -> RebalancePushImpl#dispatchPullRequest -> DefaultMQPushConsumerImpl#executePullRequestImmediately -> PullMessageService#executePullRequestImmediately -> #run -> #pullMessage  -> DefaultMQPushConsumerImpl#pullMessage -> PullAPIWrapper#pullKernelImpl -> MQClientAPIImpl#pullMessage -> ConsumeMessageConcurrentlyService#submitConsumeRequest -> ConsumeRequest#run -> MessageListenerConcurrently#consumeMessage

总结:

  • push模式中, 通过rebalance定期触发消费消息
  • pull模式中, 需要手动拉取.
  • 无论是push模式还是pull模式, 最终都是调用 PullAPIWrapper#pullKernelImpl 实现的

broker 接受消息

PullMessageProcessor:

    PullMessageProcessor#processRequest -> DefaultMessageStore#getMessage -> ConsumeQueue.getIndexBuffer  + CommitLog#getMessage 

逻辑: 先从ConsumeQueue查找索引内容, 再到 CommitLog 中读取文件内容.

特点:

  • zeroCopy的支持:

    消费消息,下发消息的时候: 通过继承Netty FileRegion的实现 ManyMessageTransfer, 最终实现 文件传输的zerocopy 传输, 和kafka类似, 也支持 []byte 的拷贝, rocketmq 通过参数配置

  • 阻塞通知机制

    当consumer消费消息的时候, 没有消息存在的情况下, broker 会阻塞起调用, 当consume queue有消息的时候, 在通知consumer. 超时时间 5000 ms.

faq: 如何确定消息关系

实现接口 AllocateMessageQueueStrategy, 默认实现有

  • AllocateMachineRoomNearby、
  • AllocateMessageQueueAveragely、
  • AllocateMessageQueueAveragelyByCircle、
  • AllocateMessageQueueByConfig、
  • AllocateMessageQueueByMachineRoom、
  • AllocateMessageQueueConsistentHash,
  • 默认是 AllocateMessageQueueAveragely, 注意, 只有Clustering模式才需要负载均衡, 广播模式中 每个consumer订阅所有的 queue.

faq: rebalance

RebalanceService定期执行, 调用路径如下

RebalanceService#run -> MQClientInstance#doRebalance -> MQConsumerInner#doRebalance -> RebalanceImpl#doRebalance...#rebalanceByTopic...#updateProcessQueueTableInRebalance...#messageQueueChanged

faq: offset管理

consumer offset 在broker中是存储在 本地磁盘上的, 并且只保存 每个queue最新的offset. 保存的操作不是实时的, 是通过定时任务执行的. 具体实现可以参看 BrokerController#initialize.

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerOffsetManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumerOffset error.", e);
                    }
                }
            }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

简述:

  • rebalance通过consumer端定期调用执行的.
  • rebalance过程中, 会重新进行 queue分配, 根据比较和上次分配结果的差异, 判断是否需要处理
  • 在 pull 模式中, 当发生 rebalance 的时候, 会取消不存在queue的PullTaskImpl, 同时对新的queue,添加PullTaskImpl, 参看类 MQPullConsumerScheduleService
  • 在 push模式中, 触发 PullMessageService 立即执行 新分配的的queue的PullRequest

特殊的地方

CONSUME_FROM_LAST_OFFSET 如果偏移量0还在磁盘中, 是从头开始消费的, 而不是最新. 参考: https://juejin.im/post/5d72724cf265da03be48fd24