One minute
Rocketmq Broker Send
流程分析-消息发送:
producer 发送消息
DefaultMQProducer#send -> DefaultMQProducerImpl#send...#sendDefaultImpl -> #sendKernelImpl -> MQClientAPIImpl#sendMessage -> 不同情况讨论
分类情况:
- invokeOneway: NettyRemotingClient#invokeOneway, netty channel发送.
- sendMessageAsync: netty channel 异步发送, 通过回调处理结果
- sendMessageSync: NettyRemotingClient#invokeSync, netty channel 阻塞等待结果
broker 发送消息
SendMessageProcessor: 负责处理消息的发送 普通消息的发送:
SendMessageProcessor#processRequest -> #sendMessage -> DefaultMessageStore#putMessage -> CommitLog#putMessage -> MappedFile#appendMessage
批量消息的发送:
SendMessageProcessor#processRequest -> #sendBatchMessage-> DefaultMessageStore#putMessages -> CommitLog#putMessages -> MappedFile#appendMessages -> DefaultAppendMessageCallback#doAppend
值得注意的是, 无论是 SendMessage 还是 putMessages, 最后都会执行下面两个函数
handleDiskFlush(result, putMessageResult, msg);
handleHA(result, putMessageResult, msg);
handleDiskFlush主要是处理磁盘刷新策略的, 主要有同步模式和异步模式, 有三种实现
- CommitRealTimeService: 异步刷盘
- FlushRealTimeService: 异步刷盘
- GroupCommitService: 同步刷盘
ha机制处理: 进行主从同步
其他相关概念:
dlq: 死信队列
问题
- producer 如何决定向哪个broker发送?
- 可以指定messageQueue, 向这个queue所在的brokerName的第一台broker addr发送
- 通过实现 MessageQueueSelector 实现消息的发送策略. 默认实现有 SelectMessageQueueByHash、SelectMessageQueueByMachineRoom、SelectMessageQueueByRandom
- 默认情况下使用 MQFaultStrategy 方式, 正常情况下是 round robin 方式发送的, 添加了 检活 容错的逻辑处理
99 Words
2019-03-13 23:07 +0800