Preface

之前分析了 基于raft的DLedger实现, 这里分析下老版本的master-slave 主从复制 以及 刷盘机制

抽象

  • GroupCommitRequest: 如果是 半同步, 即MASTER_SYNC 模式, 会对写入请求封装成 GroupCommitRequest 触发同步
  • HAService$GroupTransferService: 用来检查 GroupCommitRequest 是否同步成功/超时. 同步操作依赖底层统一的数据同步实现
  • HAService$HAClient: slave用来创建和master的连接, 上报 offset 和 获取写入数据到 commitlog
  • HAService$AcceptSocketService: 用来接收slave创建的连接, linux平台使用epoll, 只监听accept事件
  • HAConnection: master上表示slave创建的一个连接
  • HAConnection$ReadSocketService: 专门负责读取 slave 提交的ackOffset 的线程, 只监听read事件
  • HAConnection$WriteSocketService: 负责发送同步数据, 也会在数据包中包含心跳的头(间歇), 只监听write事件

replicaRequest 流程

master 因为后面的版本支持了 future 模式, 因此 入口有两个, 分别对应 CommitLog#putMessage 和 CommitLog#asyncPutMessage, 最终都会进入 CommitLog#submitReplicaRequest. replicaRequest 提交并不会触发任何同步行为, 因为同步本身是异步线程进行的, 提交的 replicaRequest 仅仅用来 检测 slave同步是否成功/超时, 相当于在另一个线程排队 (消息是顺序写入commitlog的, 因为replicaRequest 自带了顺序特性).

replicaRequest提交处理流程:

CommitLog#submitReplicaRequest -> HAService#putRequest -> HAService$GroupTransferService#run -> #doWaitTransfer -> GroupCommitRequest#wakeupCustomer 用于通知commitlog 是否已经同步完成

判断是否已经同步给slave, 只是简单判断了下 HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(), 也就是说 只要有一个slave 同步了消息, 同步就成功了.

数据同步流程

master 写入流程, master 会启动一个 AcceptSocketService 接受客户端的连接, 每一个客户端slave连接用 HAConnection 对象表示, 对应读写分别启动 WriteSocketService 和 ReadSocketService 两个对象.

master ReadSocketService

ReadSocketService 用来读取 slave 提交的 ackOffset, 用来同步的时候读取数据的offset.

流程如下:

AcceptSocketService#run -> HAConnection#run -> HAConnection$ReadSocketService#run -> #processReadEvent ->  HAService#notifyTransferSome -> HAService$GroupTransferService#notifyTransferSome

可以发现, 读线程的最终效果除了修改 同步的offset, 还会通知堵塞的 GroupTransferService 有数据同步完成, 及时的返回结果给 写入请求.

master WriteSocketService

master 用来提交发送心跳以及 数据 给slave. 比较简单, 不赘述

但是需要注意的是, master 是主动推送给slave的, 和 kafka 设计中 slave 主动拉取不一样. master 推送之前, slave会周期性的上报自己的offset, 这样master 就知道从哪个位置开始推送.

slave

slave 会启动一个HAClient 连接到 master, 上报自己的ackOffset, 并不断读取 master 推送的数据.

slave 读取流程如下

HAClient#run -> #processReadEvent -> #dispatchReadRequest -> defaultMessageStore#appendToCommitLog 

切主

rocketmq 不支持自动切主, 因此需要人工参与. 可以参考 https://github.com/didi/DDMQ/wiki/%E8%87%AA%E5%8A%A8%E4%B8%BB%E4%BB%8E%E5%88%87%E6%8D%A2

flush 机制

rocketmq 支持 同步刷盘和异步刷盘两种模式, 同步刷盘一定程度上降低了 rocketmq 的处理能力.

同步/异步 刷盘

抽象

  • GroupCommitService: 同步刷盘策略, 每次写入数据都会触发磁盘刷盘
  • FlushRealTimeService: 近实时刷盘策略, 数据页数量或者刷新时间到, 会触发刷新

同步实现

因为上层抽象了 future模式, 导致写入有两个入口, 对应的 刷盘也有两个入口: CommitLog#submitFlushRequestCommitLog#handleDiskFlush.

无论同步异步还是同步, 进入同步刷盘的逻辑流程如下:

GroupCommitService#putRequest ->  MappedFileQueue#flush( -> MappedFile#flush) + GroupCommitRequest#wakeupCustomer (发回响应)

文件的第一次刷新需要刷新元数据. 使用双buffer进行读写, 每次读完buffer, 会刷新一次checkpoint.

异步刷盘

FlushRealTimeService 触发刷新的条件有两个:

  • 刷新间隔超过配置的 flushCommitLogThoroughInterval, 默认 10s, 会触发无脑刷新
  • 刷新的page数量超过配置的 flushCommitLogLeastPages, 默认是 4

近实时刷新每次刷新的时候都会更新checkpoint. 刷新的时候支持一个参数配置: flushCommitLogTimed, 按时间刷新, 就是等待指定的 flushIntervalCommitLog 才进行刷新, 默认false, 就是有写入请求触发才进行刷新. 注意, 这个只是触发条件. 真正刷新需要传递 flushLeastPages 参数的, 0是无脑刷新, 有值 值需要满足 待刷新page数 符合需求才会刷新. 因此 在满足 flushCommitLogThoroughInterval 的情况下, flushLeastPages 就是传0