启动协议

  1. consumer -> meta server: CLIENT_REGISTER, 注册, 获取topic原信息
  2. consumer -> broker: PULL_MESSAGE: 拉取消息
  3. consumer -> broker: ACK_REQUEST: 消息消费完确认

消息的消费流程

consumer

consume 获取消息

   PullRegister#regist...registPullEntry...createAndSubmitPullEntry ->  PullEntry#run...#doPull -> AbstractPullEntry#pull -> PullService#pull -> NettyClient#sendAsync

消息的处理

PullEntry#doPull -> PushConsumerImpl#push -> HandleTaskImpl#run-> HandleTask#run -> GeneratedListener#onMessage -> @QmqConsumer注解的方法

DefaultPullConsumer: 通过queue设计的生产消费模型, PlainPullEntry: 对broker进行负载均衡,从broker拉取消息 WeightLoadBalance: 按权重随机分配要消费的queue的默认策略

ack

HandleTask#run...triggerAfterCompletion -> PushConsumerImpl#ack -> AckHelper#ackWithTrace...#ack -> AckEntry#ack...completed -> AckSendQueue#ackCompleted -> LinkedBlockingQueue#offer -> AckSendQueue#sendAck...doSendAck -> AckService#sendAck...sendRequest ->  NettyClient.sendAsync   

处理完消息之后, 在开启auto commit的时候[ps:并没有不开启的方法], 会进行ack.

broker

consume

PullMessageProcessor#processRequest -> PullMessageWorker#pull -> PullMessageWorker#process -> MessageStoreWrapper#findMessages -> ConsumeQueue#pollMessages -> DefaultStorage#pollMessages -> ConsumerLog#selectIndexBuffer -> MessageLog#getMessage -> LogSegment#selectSegmentBuffer 

消息消费过程中, 因为支持按条ack的mode, 会处理unAck的消息. 其中, 会涉及到Ack的pull log, pull log如下面的ack.

ack

AckMessageProcessor#processRequest -> AckMessageWorker#process -> ConsumerSequenceManager#putAckActions -> ConsumerSequence#setAckSequence + DefaultStorage#putAction -> ActionLog#addAction -> LogSegment#append -> ActionAppender#doAppend 

pullLog 是在Ack过程中进行异步构建的. 流程如下

BuildConsumerLogEventListener#onEvent -> FixedExecOrderEventBus#post ->PullLogBuilder#onEvent -> DefaultStorage#putPullLogs -> PullLog#putPullLogMessages -> LogSegment#append -> PullLogMessageAppender#doAppend 

注册到ActionLogIterateService中, 通过不断读取 ActionLog 构建PullLog 很奇怪的是, 为什么consumer log记录offset, 还记录 pull log.

扩容怎么处理

扩容的时候, consumer先消费完之前的broker的消息, 在接收到扩容指令的消息后, 拉取指定版本的路由关系, 再获取消息