One minute
Qmq Producer
启动协议
- producer -> meta-server: CLIENT_REGISTER: 注册, 获取topic原信息
- producer -> broker: SEND_MESSAGE: 发消息到指定的broker
消息的发送流程
producer
MessageProducerProvider#sendMessage -> ProduceMessageImpl#send...#doSend...#sendSync -> RPCQueueSender#send...process -> MessageSenderGroup#send -> NettyConnection#send...#doSend -> NettyProducerClient#sendMessage -> NettyClient#sendSync...sendAsync
主要的代码:
- NettyClient: 负责底层连接的创建和消息的发送
- NettyProducerClient: 简单的封装
- BrokerLoadBalance: 负载均衡消息到集群中broker
- NettyConnection: queue的消息处理器
- MessageSenderGroup: 消息发送和异常处理的封装
- RPCQueueSender: 通过RouterManager将消息遍历投递给NettyConnection
- RouterManager: 路由消息应该进入哪一个队列
broker
处理流程如下
SendMessageProcessor#processRequest -> SendMessageWorker#receive -> MessageStoreWrapper#putMessage -> DefaultStorage#appendMessage -> MessageLog#putMessage
-> LogSegment#append -> RawMessageAppender#doAppend
broker使用LogManager维护log, 提供最新的log进行mmap方式写入.
除了正常的消息投递, broker还会异步构建consumer queue. 相关处理流程如下:
Dispatcher#run...processLog -> FixedExecOrderEventBus#post -> BuildConsumerLogEventListener#onEvent -> ConsumerLog#putMessageLogOffset -> LogSegment#append -> ConsumerLogMessageAppender#doAppend
入口是在 ActionLogIterateService内部类 Dispatcher中, 通过不断读取 message Log构建consumer log.
扩容怎么处理
扩容过程中, 会收到发送失败的异常,然后拉取最新的路由列表, 重新发送
85 Words
2019-03-23 09:34 +0800