2 minutes
Rocketmq_nomessage
背景
最近在修rocketmq-golang-client的问题的时候, 发现在处理 PullNoNewMsg 的时候会导致 offset 被自动提交, 但是用户并没有设置自动ack, 并且也没有手动ack
注:
rocketmq 开源的版本并没有ack的概念
排查
于是, 通过日志打印调试, 发现是在 rocketmq-client-go 拉取消息处理 primitive.PullNoNewMsg
的状态的时候, 直接将 result.NextBeginOffset 替换为 request.nextOffset, 并且还 更新了本地offsetStore的offset 信息, 因为 rocektmq-client-go 是 周期性提交offset, 所以导致了 offset被ack 了
解决
在rocketmq-client-go的内部开发版本中, 直接将 offset 的本地存储更新给注释掉就可以了, 因为内部开发中, 是异步处理处理消息的, 并且offset的提交不需要满足递增的特性 (考虑到很多场景中可能存在 offset被移动到 更小的情况)
在开源的版本中, 对齐java的实现, 判断 processQueue是否有消息, 如果没有消息, 在更新本地offsetStore, 避免提交了 正在消费的消息
更多的理解
乘这次机会, 重新梳理了 rocketmq 在 pullMessage 的响应逻辑的处理. 根据客户端处理的逻辑, 区分如下 (不涉及到transaction)
1.NO_NEW_MSG
当broker返回 ResponseCode.PULL_NOT_FOUND
的时候, 客户端会转义成 PullStatus.NO_NEW_MSG
, 会执行如下操作:
- 设置下次pullMessage的request的nextOffset 为
pullResult.getNextBeginOffset()
- 如果这个queue本地的processQueue没有消息, 就更新本地offsetStore的offset到nextOffset, 这个会通过定期提交offset最终提交到 broker的记录中
- 重新调度拉取消息
那么, broker 什么时候会返回 PULL_NOT_FOUND
呢?
- NO_MESSAGE_IN_QUEUE: queue没有数据, 可能是确实没有数据写入, 也可能是被删除了, 没数据写入的场景下会返回
PULL_NOT_FOUND
, nextoffset=0, client的操作没有任何意义(甚至重新调度的时间也是可以延迟的). 如果是删除的场景, 会返回PULL_OFFSET_MOVED
, 客户端转义成OFFSET_ILLEGAL
, 具体操作参考下面的分析 (对应broker内部 NO_MESSAGE_IN_QUEUE) - 在读取consumer queue file 时候失败了, 一般是文件出问题了, 直接跳转到下一个文件 (broker内部 OFFSET_FOUND_NULL)
- consumer queue已经读取完毕了 (broker内部OFFSET_OVERFLOW_ONE)
2.NO_MATCHED_MSG
当broker返回 PULL_RETRY_IMMEDIATELY
, 客户端会转义成 NO_MATCHED_MSG
, 执行如下操作:
- 重置 pullRequest#nextOffset 为 pullResult#nextBeginOffset
- 如果queue对应的processQueue没有消息, 则重置offset
- 重新调度拉取消息
那么, broker 什么时候会返回 PULL_RETRY_IMMEDIATELY
呢?
- 从 从节点 读取消息但是 BrokerConfig().isSlaveReadEnable() 不允许 从读取
- 一轮拉取行为中, 第一次从 commitlog 没有读取到消息, 并且随后也都没有读取到消息. 这种情况下,是 consumer queue的消息已经是过期的了, 下次拉取的offset 就是 当前offset + 拉取数量的offset. (对应 MESSAGE_WAS_REMOVING)
- consumer queue 的tag过滤不满足 或者 类过滤不满足 (对应NO_MATCHED_MESSAGE)
3.OFFSET_ILLEGAL
当broker返回 PULL_OFFSET_MOVED
, client会转义成 OFFSET_ILLEGAL
, 并执行如下操作:
- 更新 processQueue 的 nextOffset 字段为
pullResult.getNextBeginOffset()
, 并设置dropped=true
, 丢弃这个queue - 10s 后执行: 更新offsetStore的queue对应的offset并持久化到 broker, 然后从 本地删除 messageQueue (等待下次rebalance之后queue的owner进行拉取操作)
不理解: 为什么要等待10s, 理解执行问题也不大
那么, broker 什么时候会返回 PULL_OFFSET_MOVED
呢
- consumer queue文件被删除了, 导致需要从0开始读取consumer queue文件, 需要client重置offset (对应broker NO_MESSAGE_IN_QUEUE)
- client offset 大于 consumer queue maxOffset, 原因是 consumer queue文件消息丢失: 比如 主从切换, commitlog文件最近文件损害导致consumer queue文件截断等 (broker内部 OFFSET_OVERFLOW_BADLY)
- consumer offset 小于 consumer queue minoffset, 很可能是 consumer queue 老的数据文件被清理了, 而 cosnumer group 的消费能力还没有消费完 之前被清理的数据 (或者consumer group被暂停了一段时间) (对应broker的OFFSET_TOO_SMALL)
4.FOUND
有消息, 不赘述. 会返回下次拉取的起点offset字段: nextOffset
需要注意的是, 只要能够拉取到一条消息,返回的状态码都会是 FOUND
总结
- 还是很复杂的, 自定义逻辑还是需要避免采坑