背景

最近要开发延迟消息, 这里记录下 recover相关的逻辑实现

原理

之前知道, rocketmq是所有的消息统一投递到 commitlog, 然后异步构建 consumer queue, 那么, 如果机器正常重启/异常宕机的情况下, 又是怎么恢复的呢?

前菜

rocketmq 使用了 checkpoint 文件记录了 physicMsgTimestamp logicsMsgTimestamp indexMsgTimestamp 三个字段, 分别表示 commitlog 的flush的时间点、comsumer queue的flush的时间点、index file 刷新的时间点. 也就是 已经落地磁盘的时间点. (通过fileChannel#force)

那么 这些时间点什么场景下会被更新, 什么时候checkpoint会flush呢?

  1. physicMsgTimestamp

首先, CommitLog 本身既有一个定时flush的任务, 根据flush方式的不同, 有两种实现: GroupCommitService 和 FlushRealTimeService(后面单独分析), 无论是同步还是异步, 每次flush之后都会设置 physicMsgTimestamp.

除此之外, 在 dledger模式中, slave构建 consumer queue的时候 也会设置 physicMsgTimestamp

  1. logicsMsgTimestamp

在定时flush consumer queue 以及 追加consumer queue消息的时候, 都会更新. (因此, logicsMsgTimestamp 并不是 consumer queue flush的时间)

  1. indexMsgTimestamp
  • 在切换 indexFile 的时候, 会触发 把之前的 indexFile 刷新 以及 更新 indexMsgTimestamp

flush

  • rocketmq 正常关闭 (这里会触发两次刷新, DefaultMessageStore#shutdown 会分别调用 storeCheckpoint#flush storeCheckpoint#shutdown)
  • 在indexFile切换的时候, 不仅仅会flush 之前的 index file, 还会触发 checkpoint file flush
  • conumer queue 定时刷新的时候, 除了更新 logicsMsgTimestamp, 也会触发 checkpoint file flush

正常重启

rocketmq 每次启动的时候, 会在存储根目录下面新建一个 abort 文件, 如果是正常关闭, 那么在shutdown的时候会删除 abort 文件, 如果是异常宕机 (断电、进程强杀等), abort 文件就会一直存放在那里. 因此在启动的时候, DefaultMessageStore 就会检查是否存在 abort 文件, 判断是正常启动 还是 异常启动.

恢复过程中, 需要区分两种情况, commitlog文件可能已经完全被删除了, 这个时候需要将 consumer queue文件也全部删除 常规恢复的场景中, 主要分为三步骤:

  1. 恢复 consumer queue, 获取 consumer queue 中最大的 offset
  2. 恢复 取commitlog, 最大的 processOffset, 对 consumer queue 的多出的数据文件进行 截断
  3. 根据 commitlog 恢复 consumer queue 的消息

1-3参考 DefaultMessageStore#recover、 ConsumeQueue#recover、CommitLog#recoverNormally, 4 的逻辑在 DefaultMessageStore#start

流程1 从倒数3个文件开始恢复 (不足三个, 从第一个文件开始, 倒数第三个 是一个经验数值), 找到最大的 consumer queue的 processOffset, 设置元信息: flushedWhere 和 committedWhere; 然后遍历文件进行 truncate:

文件起始offset 大于 processOffset 的直接删除, 
文件结束offset 小于 processOffset 不处理,
offset 位于 文件内部的情况, 设置文件的 元信息

需要注意的是, 因为 rocketmq 支持 tag ext 扩展文件, 因此在恢复的时候, 也会对 tag ext 恢复. 这里不赘述

流程2

从倒数3个文件开始恢复 (不足三个, 从第一个文件开始), 通过检查文件内容是否合法确定最后的写入位置: processOffset, 这里检查文件内容合法的方法比较特殊: 读取文件内容并构建一个 dispatchRequest. 根据 processOffset 设置元信息: flushedWhere 和 committedWhere, 遍历文件进行 truncate, 逻辑和 consumer queue的一样.

和 consumer queue 文件恢复不同的地方在于, commitlog 需要对consumer queue的文件内容进行 “纠偏”. 因为 consumer queue的数据都是从 commitlog 构建的, 因此 需要确保consumer queue的数据在 commitlog 全都要找到, 因此在恢复的时候, 需要根据 commitlog 的 processOffset 对 consumer queue 进行截断 和 元信息重置

  • 需要注意的是, 在读取文件如何区分 读取到文件末尾的情况?

commitlog在写入数据的时候, 会进行判断 msgLen + 8() > 剩余文件空间, 如果true, 那么, 就会放弃在这个文件写入, 会轮转到下一个文件写入, 同时在这个文件的末尾写入 totalSize(int=4 byte) + CommitLog.BLANK_MAGIC_CODE(int=4 byte)

流程3

完成上面两个流程, 基本上保证了 commitlog 和 consumer queue 文件的正确性. 但是这里存在一个问题, 可能consumer queue的数据 少于commitlog, 因为 构建consumer queue 速度慢于 commitlog 或者 consumer queue 文件被删除 或者新启的broker copy了别的机器的commitlog. 那么就需要一个机制 将commitlog 中的数据 重新构建到 consumer queue, 流程3 就是做了这件事情

通过获取 consumer queue最大的 processOffset, 然后从 commitlog 的 processOffset 点位进行构建工作.

异常宕机

异常宕机相比于正常关闭, 需要借助 checkpoint 文件进行恢复.

整体流程和正常恢复差异不大, 依旧是上面上个流程

  1. 恢复 consumer queue, 获取 consumer queue 中最大的 offset
  2. 恢复 取commitlog, 最大的 processOffset, 对 consumer queue 的多出的数据文件进行 截断
  3. 根据 commitlog 恢复 consumer queue 的消息

唯一的不同, 在于第二步骤, 因为是异常宕机, 所以不能从倒数第三个, 需要一个可以 check的时间点(minTime) 进行恢复, 这个时间点之前的文件是可以认为是正确的, 时间点之后的文件开始恢复. 根据配置的不同, 有两种选择:

  1. 需要根据checkpoint记录的 physicMsgTimestamp 和 logicsMsgTimestamp 的最小值开始恢复.
    1. 配置 MessageStoreConfig#messageIndexSafe = ture (默认false) 和 messageIndexEnable = true (默认true) 的时候, 会以 physicMsgTimestamp logicsMsgTimestamp indexMsgTimestamp 的最小开始恢复

确定了时间点之后, 倒序找到第一个文件存储时间小于 minTime的文件, 从这个文件开始, 不断读取消息 并执行 dispatcher 责任链逻辑: 构建consumer queue、构建index message. 剩下的逻辑: 重置元信息、截断 consumer queue 和正常恢复一样

需要注意的是, 这里提供了 index message 的minTime 机制, 对于一些依赖 index 逻辑的场景, 还是很有必要的

总结

  1. 异常恢复需要重建 consumer queue 和 index message
  2. 恢复过程需要确保 consumer queue 信息和 commitlog 对齐, 不能多(截断)、不能少(重新reinput)
  3. 依赖index message的场景, 需要开启 MessageStoreConfig#messageIndexSafe=true, 确保index 的完整性

思考

  1. 代码冗余度很高, 需要优化下