One minute
Rocketmq_msgid
最近利用 msgId 进行一些延迟实现, 结果发现, msgId 在 producer 和 consumer 两侧是不一致的.
复现
我用producer发送一条消息如下:
SendResult [sendStatus=SEND_OK, msgId=0AFE2AEF000018B4AAC2562A9AC70000, offsetMsgId=0AE1578800002A9F0000000C6C988CC2, messageQueue=MessageQueue [topic=test_create_topic, brokerName=sandbox_boe4, queueId=0], queueOffset=0]
需要注意的是, msgId 和 offsetMsgId 是不一样的. 在consumer侧, 我接受到的消息如下:
Receive New Messages: [MessageExt [queueId=0, storeSize=197, queueOffset=0, sysFlag=0, bornTimestamp=1584437632711, bornHost=/10.254.42.239:49872, storeTimestamp=1584437632868, storeHost=/10.225.87.136:10911, msgId=0AE1578800002A9F0000000C6C988CC2, commitLogOffset=53361544386, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test_create_topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, KEYS=OrderID188, CONSUME_START_TIME=1584437674686, UNIQ_KEY=0AFE2AEF000018B4AAC2562A9AC70000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null'}]]
consumer 收到的 msg的 msgId 和 producer的 msgId 是不一样的, 但是, producer侧的msgId 和 consumer侧的 UNIQUE_KEY 的值是一样的, producer 的 offsetMsgId 和 consumer侧的 msgId 是一致的.
通过控制台搜索, 发现 (msgId)producer 和 (offsetMsgId)producer 都可以搜到消息, 并且显示的msgId 都是 (msgId)producer.
分析
翻看了源代码发现, (msgId)producer 其实是客户端设置的, 用户没有设置的时候, 会自动生成一个, 参看 MessageClientIDSetter#createUniqID. 这个值在处理发送的响应的时候会被用到, producer 将这个生成的 PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX
的值 放到了 SendResult
的 msgId 对象中, 而将 broker 的msgId 设置到了 SendResult
的 offsetMsgId 中
但是在broker侧, CommitLog#createMessageId
基于 ip+物理偏移 创建的 msgId 传递给了 producer, producer 存储成 offsetMsgId. 需要注意的是 broker 并不会存储这个 msgId. 在消费的时候, consumer 在获取到消息之后, 会基于获取的 storeHost+commotLogOffset 重新创建出 msgId 放到msg 中, 具体参看 MessageDecoder#decode.
所以, 基本上就是 producer 和 consumer 两边的概念不一致导致的
补充
uniqueKey 自动生成的规则像这样: ip+pid+classloader hashcode + 当前时间戳 + 生存时间长度(最大一个月) + counter. 显然, 后面的一段时间并不能保证唯一, 而且在 k8s场景中, ip 也不是唯一的, 用户设置 msgid, 不是单一的key, 是可以多个的.