One minute
Qmq Consumer
启动协议
- consumer -> meta server: CLIENT_REGISTER, 注册, 获取topic原信息
- consumer -> broker: PULL_MESSAGE: 拉取消息
- consumer -> broker: ACK_REQUEST: 消息消费完确认
消息的消费流程
consumer
consume 获取消息
PullRegister#regist...registPullEntry...createAndSubmitPullEntry -> PullEntry#run...#doPull -> AbstractPullEntry#pull -> PullService#pull -> NettyClient#sendAsync
消息的处理
PullEntry#doPull -> PushConsumerImpl#push -> HandleTaskImpl#run-> HandleTask#run -> GeneratedListener#onMessage -> @QmqConsumer注解的方法
DefaultPullConsumer: 通过queue设计的生产消费模型, PlainPullEntry: 对broker进行负载均衡,从broker拉取消息 WeightLoadBalance: 按权重随机分配要消费的queue的默认策略
ack
HandleTask#run...triggerAfterCompletion -> PushConsumerImpl#ack -> AckHelper#ackWithTrace...#ack -> AckEntry#ack...completed -> AckSendQueue#ackCompleted -> LinkedBlockingQueue#offer -> AckSendQueue#sendAck...doSendAck -> AckService#sendAck...sendRequest -> NettyClient.sendAsync
处理完消息之后, 在开启auto commit的时候[ps:并没有不开启的方法], 会进行ack.
broker
consume
PullMessageProcessor#processRequest -> PullMessageWorker#pull -> PullMessageWorker#process -> MessageStoreWrapper#findMessages -> ConsumeQueue#pollMessages -> DefaultStorage#pollMessages -> ConsumerLog#selectIndexBuffer -> MessageLog#getMessage -> LogSegment#selectSegmentBuffer
消息消费过程中, 因为支持按条ack的mode, 会处理unAck的消息. 其中, 会涉及到Ack的pull log, pull log如下面的ack.
ack
AckMessageProcessor#processRequest -> AckMessageWorker#process -> ConsumerSequenceManager#putAckActions -> ConsumerSequence#setAckSequence + DefaultStorage#putAction -> ActionLog#addAction -> LogSegment#append -> ActionAppender#doAppend
pullLog 是在Ack过程中进行异步构建的. 流程如下
BuildConsumerLogEventListener#onEvent -> FixedExecOrderEventBus#post ->PullLogBuilder#onEvent -> DefaultStorage#putPullLogs -> PullLog#putPullLogMessages -> LogSegment#append -> PullLogMessageAppender#doAppend
注册到ActionLogIterateService中, 通过不断读取 ActionLog 构建PullLog 很奇怪的是, 为什么consumer log记录offset, 还记录 pull log.
扩容怎么处理
扩容的时候, consumer先消费完之前的broker的消息, 在接收到扩容指令的消息后, 拉取指定版本的路由关系, 再获取消息
144 Words
2019-03-23 09:34 +0800