背景

最近基于golang做了 消息查询的功能, 这里做一些记录

原理

rocketmq的消息查询, 支持两种模式: offsetMsgId 和 msgkey、uniqueKey, 我这里避免了 msgid 的命名, 因为在 rocketmq client的实现过程中, msgid 存在很大的差异.

基本概念

offsetMsgId

offsetMsgId 本质上是 rocketmq commitLog 生成的, 生成格式如下:

    public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
        input.flip();
        int msgIDLength = addr.limit() == 8 ? 16 : 28;
        input.limit(msgIDLength);

        input.put(addr);
        input.putLong(offset);

        return UtilAll.bytes2string(input.array());
    }

可以发现, offsetMsgId 是基于commitLog 所在的地址 + 消息的offset 组成, 保证了 唯一性. 因此通过offsetMsgId 可以借助这个特性.

msgkey

在消息发送的时候, 发送的消息是可以指定消息的key的, 需要注意的是, msgKey可以设置多个. 如下:

public class Message implements Serializable {
	....
    public void setKeys(String keys) {
        this.putProperty(MessageConst.PROPERTY_KEYS, keys);
    }

    public void setKeys(Collection<String> keys) {
        StringBuffer sb = new StringBuffer();
        for (String k : keys) {
            sb.append(k);
            sb.append(MessageConst.KEY_SEPARATOR);
        }

        this.setKeys(sb.toString().trim());
    }
    .....
}

uniqueKey

producer发送消息的时候, 除了指定key之外, 还可以指定 uniqueKey, 和 key 不同的是, 需要用户保证 uniqueKey 的唯一性. 指定方式需要显示指定:

Message msg = new Message("test_create_topic", "body")
msg.putProperty(MixAll.UNIQUE_MSG_QUERY_FLAG, "uniqueKey")

无论是 uniqueKey 还是 msgKey 都会被索引到 key的索引文件中. 而 offsetMsgId 并不会被索引, 因为offsetMsgId是自描述完备的, 因此不需要单独存储

存储

rocketmq 提供的 key&&uniquekey查询, 是基于 rocketmq dispathcher 机制, 之前我们了解到, rocketmq 是将所有topic的消息 写入到同一个 commitLog, 然后启动了另一个线程从 commitlog 中不断读取消息并追加到 相应的 consumer queue 中. 这里的dispatcher是一个责任链的实现, 不仅仅有 CommitLogDispatcherBuildConsumeQueue (就是生成consumer queue的逻辑) 的实现, 还有 CommitLogDispatcherBuildIndex 的实现, CommitLogDispatcherBuildIndex 就是不断读取消息 并写入 indexFile.

indexFile 的文件格式: storePath/${year}${month}${day}${hour}${min}${sec}${mills} 的格式, 在indexFile 中, 每个key的索引记录(index record)是:

keysHash (4)
phyoffset (4)
timediff (8)
slotValue (4)

这里的 keyHash 是 $topic#$key 的String#hashCode, 需要注意的, indexFile 会分别设置的 多个key 单独索引, 所以一个消息设置了 2个key, 就有两个索引记录. 但是对多个key进行索引的场景下, 必然存在 hash冲突, 大家对于hashmap对于冲突的解决是很了解的, 在 indexFile 的实现中, 也是使用传统的 开址链表 法, 下面讲解下 indexFile的内容.

在indexFile 中, 分成了三部分, indexHeader、 hashSlot、index records.

indexHeader 存储了: beginTimestamp endTimestamp beginPhyOffset endPhyOffset hashSlotCount indexCount, 其中, beginTimestamp 和 endTimestamp 是为了方便在消息搜索的时候对多个索引文件进行时间过滤, 以及存储消息时间的时候减少字节, index record 的 timediff 就是 消息的生成时间 - beginTimestamp 得到的. beginPhyOffset endPhyOffset 的作用是 在删除的时候, 通过比较 endPhyOffset < 指定的offset 进行过期文件删除. hashSlotCount 记录了 indexFile 中 hashSlot 的数量, 没什么意义, indexCount记录了 当前 indexFile中 index record 的数量, 只有当 indexCount 小于 配置的最大值, 才能新增 索引记录(index record), 默认 5_000_000 * 4.

hashSlots 也单独划分的一块文件空间, 用来存储 hashSlot, 相当于 开址链表 的数组空间. 和 传统的map的 开址链表 不同, hashSlot 并不存储 index key, 而是存储了 “最新” 的index record的物理偏移(因为 index record 是固定长度编码, 并且是 append模式递增存储, 这里只需要存储index record的 index count), 当有重复的key映射到同一个 hashSlot 的情况, 会在新的 index record 的字段 slotValue 记录之前的 index record 的 count 地址, 这个时候的hashSlot则存储了 新添加的 index record 的count. 这样实现相对简单, 如果 hashSlot 总是存储第一个 index record 的count, 那么重复场景下 就需要操作两个 index record (除了新增的, 还需要操作老的index record, 将老的index record的 slotValue更新到 新的 index record的值), 而当前实现只需要操作 一个 index record.

index records 的文件空间存储的是实际的 index record. 如上面的记录, 需要注意的是, 因为查询的时候是可以指定时间, 所以这里存储了 timediff, slotValue 起到了链式存储的效果, phyoffset 是为了从 commitlog 获取实际的消息.

查询流程

mqadmin 支持了多种查询的方式:

   queryMsgById         Query Message by Id
   queryMsgByKey        Query Message by Key
   queryMsgByUniqueKey  Query Message by Unique key
   queryMsgByOffset     Query Message by offset

首先, 对于key的查询, 都是基于 IndexService#queryOffset. 签名如下:

queryOffset(String topic, String key, int maxNum, long begin, long end)

签名中, maxNum 是为了避免broker读取大量数据返回给客户端, 影响broker的稳定性. begin 和 end 是 希望查询的数据的时间范围, 查询遍历每个索引文件的时候, 会先对文件的时间进行过滤, 索引文件在 indexHeader 中存储了 beginTimestamp 和 endTimestamp, 分别表示了第一条index record 的时间 和 最后一个 index record 的时间. 搜索的时候, 根据key进行hash得到 hash slot, 然后对 hash slot 指定的 index record 进行读取和过滤. 读取过程中, 因为每一个index record 的 SLOTvalue 都记录了 相同hash 的 前一个 index record 地址, 从而实现了 冲突链表的过滤

下面分别列出几种查询的实现

  1. queryMsgByKey

根据key搜索消息, 这种方式的查询场景下, 参数 begin 是0, end 是 Long.MAX_VALUE, 也就是说, queryMsgByKey 不会对消息的时间进行过滤. 最多获取 64条记录 (这些是mqadmin的硬编码), store 也有参数限制一次获取的最多的结果数量,默认也是 64.

需要注意的是, 因为 indexFile 存储的是key的hash结果, 所以 从indexFile搜索到的消息的key 可能并不是和搜索key 一致, 仅仅是hash结果一样, 因此本地需要过滤

  1. queryMsgById

这里的id指的是 offsetMsgId, 根据前面的理解, offsetMsgId 其实包含了 brokerAddr 和 commitLog offset 信息, 因此可以直接向 目标broker请求读取指定offset的内容

这里有一个让consumer重新消费的功能, 具体分析参照 queryMsgByUniqueKey

  1. queryMsgByUniqueKey

因为uniqueKey的特殊性, 可能是用户主动设置的(会创建索引), 也可能是 offsetMsgId(这种不会创建索引), 所以, 在请求的时候, 需要区分两种情况, 然后请求相应的api. 需要注意的是, 如果是用户设置的 uniqueKey, 那么和 queryMsgByKey 的逻辑有所不同, 因为业务保证了 uniqueKey 的唯一性, 因此本地不需要过滤.

另外, 还支持了通过 clientId 进行 CONSUME_MESSAGE_DIRECTLY 的指令, CONSUME_MESSAGE_DIRECTLY 是为了将让消费者能够直接消费消息. 整个流程如下:

  • 按照之前的流程查找 uniqueKey 的消息
  • 本地消费消息, 取出消息的所在的broker信息
  • 向消息所在的broker发送 CONSUME_MESSAGE_DIRECTLY, 指定 group 和 clientId
  • broker 将消息封装成 CONSUME_MESSAGE_DIRECTLY 的命令, 包含了指定的消息, 发送给消费者
  • 消费者消费这条消息. 需要注意的是, 这里是 一次同步调用, 消费的结果直接返回给broker
  1. queryMsgByOffset

这个api的逻辑比较清晰, 因为已经知道了 消息的broker queue 和 offset 信息, 因此直接启动consumer消费一条消息就可以了.

#文件删除#

index file的过期文件删除依赖 commitlog 和 consumerqueue 的删除逻辑, rocketmq 根据配置定期删除 过期文件. 默认执行周期: cleanResourceInterval = 10_000;, 就是 10s 为一个周期. 主要执行逻辑参照 CleanConsumeQueueService#deleteExpiredFiles 和 IndexService#deleteExpiredFile, 删除的主要逻辑是:

  1. 获取commitlog 的 minoffset
  2. 检查indexFile 文件最早的文件的 endPhyOff 是否小于 minoffset, 如果是不小于, 那么return, 不做任何处理, 即 indexFile没过期
  3. 遍历indexFile 所有文件, 收集 满足endPhyOff < minOffset 条件的 indexFile, 执行删除操作 (fileChannel#close + file#delete)

思考

  1. 竟然提供了消息重新消费的功能
  2. 多种消息查询方式便于运维