启动协议

  1. producer -> meta-server: CLIENT_REGISTER: 注册, 获取topic原信息
  2. producer -> broker: SEND_MESSAGE: 发消息到指定的broker

消息的发送流程

producer

MessageProducerProvider#sendMessage -> ProduceMessageImpl#send...#doSend...#sendSync -> RPCQueueSender#send...process -> MessageSenderGroup#send -> NettyConnection#send...#doSend -> NettyProducerClient#sendMessage -> NettyClient#sendSync...sendAsync

主要的代码:

  • NettyClient: 负责底层连接的创建和消息的发送
  • NettyProducerClient: 简单的封装
  • BrokerLoadBalance: 负载均衡消息到集群中broker
  • NettyConnection: queue的消息处理器
  • MessageSenderGroup: 消息发送和异常处理的封装
  • RPCQueueSender: 通过RouterManager将消息遍历投递给NettyConnection
  • RouterManager: 路由消息应该进入哪一个队列

broker

处理流程如下

  SendMessageProcessor#processRequest -> SendMessageWorker#receive -> MessageStoreWrapper#putMessage -> DefaultStorage#appendMessage -> MessageLog#putMessage
  -> LogSegment#append -> RawMessageAppender#doAppend  

broker使用LogManager维护log, 提供最新的log进行mmap方式写入.

除了正常的消息投递, broker还会异步构建consumer queue. 相关处理流程如下:

Dispatcher#run...processLog  ->  FixedExecOrderEventBus#post -> BuildConsumerLogEventListener#onEvent -> ConsumerLog#putMessageLogOffset -> LogSegment#append -> ConsumerLogMessageAppender#doAppend

入口是在 ActionLogIterateService内部类 Dispatcher中, 通过不断读取 message Log构建consumer log.

扩容怎么处理

扩容过程中, 会收到发送失败的异常,然后拉取最新的路由列表, 重新发送