最近小伙伴聊到 batch消息 在ack的处理方式, 是一个个ack? 还是批量ack? 如果是多个消息存储成1个, 那么 offset 是怎么管理的呢? 消费的时候, ack掉一个就多个消息都被ack了? 那么因为分成了多个消息消费, 会不会导致重复ack?

根据之前写rocketm-client-go的经验, client 传递给broker的消息确实是 batch后的 一条消息, 可是后面的路子就不是很清楚了. 因此深入研究下.

RocketMQ

  1. 应用实战

首先, 重新验证下 对批量消息的理解, 运行下 org.apache.rocketmq.example.batch.SimpleBatchProducerorg.apache.rocketmq.example.simple.PushConsumer 的例子, SimpleBatchProducer 发送批量消息后, 可以发现 PushConsumer 消费的时候, 其实是一条条消费的:

message: Hello world 0 0A5DE93A000018B4AAC231F9BC180000 3 
message: Hello world 2 0A5DE93A000018B4AAC231F9BC180002 5 
message: Hello world 1 0A5DE93A000018B4AAC231F9BC180001 4 

这样就郁闷了, 发送的时候是批量成一条消息发送的(见下文), 那么什么时候解成一条条消息的呢? 而且打印的时候, 我特意打印了每个消息的 messageId 和 queueOffset, 可以发现这些消息的 queueOffset 确实不一样, 也就是说消息确实不一样.

  1. client

首先看下 client 是怎么传递消息的. client 在传递消息的时候, 会对消息进行编码, 一般消息和batch消息的编码方式是不一样的, 参考 MessageDecoder#encodeMessage 和 MessageDecoder#encodeMessages. 批量消息的编码如下:

    public static byte[] encodeMessages(List<Message> messages) {
        //TO DO refactor, accumulate in one buffer, avoid copies
        List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
        int allSize = 0;
        for (Message message : messages) {
            byte[] tmp = encodeMessage(message);
            encodedMessages.add(tmp);
            allSize += tmp.length;
        }
        byte[] allBytes = new byte[allSize];
        int pos = 0;
        for (byte[] bytes : encodedMessages) {
            System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
            pos += bytes.length;
        }
        return allBytes;
    }

可以发现 消息其实是一个个编码然后放在一起的, 然后放到消息体里面, 然后传递给broker. 那么broker又是怎么处理批量消息的呢?

  1. broker

broker抽象了processor模型, 批处理消息的处理逻辑位于 SendMessageProcessor#sendBatchMessage, 在这个方法中, 首先将 请求的消息体 放到了 messageExtBatch对象的body中, 然后调用了底层存储, 这里以 CommitLog 为例 (DLedgerCommitLog存储类似, 只是做了raft同步). 在 CommitLog#putMessages 中, 会对批量消息进行编码, 参照: messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));, 深入看 batchEncoder 可以发现, 其实就是将 客户端传过来的消息体 当做了一条条物理消息进行了编码, 也就是说, 客户端将多个消息编码成了一条消息传输, 但是broker在存储层却又重新编码成一条条消息的物理格式, 最终在 CommitLog#doAppend 的时候进行了循环写入.

kafka

在kafka里面, 批量发送是底层的聚合效果, 在 kafkaApis#handleProduceRequest, 最终会遍历topicPartition维度的消息写入磁盘文件, 并根据 ack的条件决定是否直接响应