背景

最近在修rocketmq-golang-client的问题的时候, 发现在处理 PullNoNewMsg 的时候会导致 offset 被自动提交, 但是用户并没有设置自动ack, 并且也没有手动ack

注:

rocketmq 开源的版本并没有ack的概念

排查

于是, 通过日志打印调试, 发现是在 rocketmq-client-go 拉取消息处理 primitive.PullNoNewMsg 的状态的时候, 直接将 result.NextBeginOffset 替换为 request.nextOffset, 并且还 更新了本地offsetStore的offset 信息, 因为 rocektmq-client-go 是 周期性提交offset, 所以导致了 offset被ack 了

解决

在rocketmq-client-go的内部开发版本中, 直接将 offset 的本地存储更新给注释掉就可以了, 因为内部开发中, 是异步处理处理消息的, 并且offset的提交不需要满足递增的特性 (考虑到很多场景中可能存在 offset被移动到 更小的情况)

在开源的版本中, 对齐java的实现, 判断 processQueue是否有消息, 如果没有消息, 在更新本地offsetStore, 避免提交了 正在消费的消息

更多的理解

乘这次机会, 重新梳理了 rocketmq 在 pullMessage 的响应逻辑的处理. 根据客户端处理的逻辑, 区分如下 (不涉及到transaction)

1.NO_NEW_MSG

当broker返回 ResponseCode.PULL_NOT_FOUND 的时候, 客户端会转义成 PullStatus.NO_NEW_MSG, 会执行如下操作:

  1. 设置下次pullMessage的request的nextOffset 为 pullResult.getNextBeginOffset()
  2. 如果这个queue本地的processQueue没有消息, 就更新本地offsetStore的offset到nextOffset, 这个会通过定期提交offset最终提交到 broker的记录中
  3. 重新调度拉取消息

那么, broker 什么时候会返回 PULL_NOT_FOUND 呢?

  1. NO_MESSAGE_IN_QUEUE: queue没有数据, 可能是确实没有数据写入, 也可能是被删除了, 没数据写入的场景下会返回 PULL_NOT_FOUND, nextoffset=0, client的操作没有任何意义(甚至重新调度的时间也是可以延迟的). 如果是删除的场景, 会返回 PULL_OFFSET_MOVED, 客户端转义成 OFFSET_ILLEGAL, 具体操作参考下面的分析 (对应broker内部 NO_MESSAGE_IN_QUEUE)
  2. 在读取consumer queue file 时候失败了, 一般是文件出问题了, 直接跳转到下一个文件 (broker内部 OFFSET_FOUND_NULL)
  3. consumer queue已经读取完毕了 (broker内部OFFSET_OVERFLOW_ONE)

2.NO_MATCHED_MSG

当broker返回 PULL_RETRY_IMMEDIATELY, 客户端会转义成 NO_MATCHED_MSG, 执行如下操作:

  1. 重置 pullRequest#nextOffset 为 pullResult#nextBeginOffset
  2. 如果queue对应的processQueue没有消息, 则重置offset
  3. 重新调度拉取消息

那么, broker 什么时候会返回 PULL_RETRY_IMMEDIATELY 呢?

  1. 从 从节点 读取消息但是 BrokerConfig().isSlaveReadEnable() 不允许 从读取
  2. 一轮拉取行为中, 第一次从 commitlog 没有读取到消息, 并且随后也都没有读取到消息. 这种情况下,是 consumer queue的消息已经是过期的了, 下次拉取的offset 就是 当前offset + 拉取数量的offset. (对应 MESSAGE_WAS_REMOVING)
  3. consumer queue 的tag过滤不满足 或者 类过滤不满足 (对应NO_MATCHED_MESSAGE)

3.OFFSET_ILLEGAL

当broker返回 PULL_OFFSET_MOVED, client会转义成 OFFSET_ILLEGAL, 并执行如下操作:

  1. 更新 processQueue 的 nextOffset 字段为 pullResult.getNextBeginOffset(), 并设置 dropped=true, 丢弃这个queue
  2. 10s 后执行: 更新offsetStore的queue对应的offset并持久化到 broker, 然后从 本地删除 messageQueue (等待下次rebalance之后queue的owner进行拉取操作)

不理解: 为什么要等待10s, 理解执行问题也不大

那么, broker 什么时候会返回 PULL_OFFSET_MOVED

  1. consumer queue文件被删除了, 导致需要从0开始读取consumer queue文件, 需要client重置offset (对应broker NO_MESSAGE_IN_QUEUE)
  2. client offset 大于 consumer queue maxOffset, 原因是 consumer queue文件消息丢失: 比如 主从切换, commitlog文件最近文件损害导致consumer queue文件截断等 (broker内部 OFFSET_OVERFLOW_BADLY)
  3. consumer offset 小于 consumer queue minoffset, 很可能是 consumer queue 老的数据文件被清理了, 而 cosnumer group 的消费能力还没有消费完 之前被清理的数据 (或者consumer group被暂停了一段时间) (对应broker的OFFSET_TOO_SMALL)

4.FOUND

有消息, 不赘述. 会返回下次拉取的起点offset字段: nextOffset

需要注意的是, 只要能够拉取到一条消息,返回的状态码都会是 FOUND

总结

  1. 还是很复杂的, 自定义逻辑还是需要避免采坑