流程分析-消息发送:

producer 发送消息

DefaultMQProducer#send -> DefaultMQProducerImpl#send...#sendDefaultImpl -> #sendKernelImpl -> MQClientAPIImpl#sendMessage -> 不同情况讨论

分类情况:

  • invokeOneway: NettyRemotingClient#invokeOneway, netty channel发送.
  • sendMessageAsync: netty channel 异步发送, 通过回调处理结果
  • sendMessageSync: NettyRemotingClient#invokeSync, netty channel 阻塞等待结果

broker 发送消息

SendMessageProcessor: 负责处理消息的发送 普通消息的发送:

SendMessageProcessor#processRequest -> #sendMessage -> DefaultMessageStore#putMessage -> CommitLog#putMessage -> MappedFile#appendMessage

批量消息的发送:

SendMessageProcessor#processRequest -> #sendBatchMessage-> DefaultMessageStore#putMessages -> CommitLog#putMessages -> MappedFile#appendMessages -> DefaultAppendMessageCallback#doAppend 

值得注意的是, 无论是 SendMessage 还是 putMessages, 最后都会执行下面两个函数

handleDiskFlush(result, putMessageResult, msg);
handleHA(result, putMessageResult, msg);

handleDiskFlush主要是处理磁盘刷新策略的, 主要有同步模式和异步模式, 有三种实现

  • CommitRealTimeService: 异步刷盘
  • FlushRealTimeService: 异步刷盘
  • GroupCommitService: 同步刷盘

ha机制处理: 进行主从同步
其他相关概念: dlq: 死信队列

问题

  1. producer 如何决定向哪个broker发送?
  2. 可以指定messageQueue, 向这个queue所在的brokerName的第一台broker addr发送
  3. 通过实现 MessageQueueSelector 实现消息的发送策略. 默认实现有 SelectMessageQueueByHash、SelectMessageQueueByMachineRoom、SelectMessageQueueByRandom
  4. 默认情况下使用 MQFaultStrategy 方式, 正常情况下是 round robin 方式发送的, 添加了 检活 容错的逻辑处理