2 minutes
Rmq_ha
Preface
之前分析了 基于raft的DLedger实现, 这里分析下老版本的master-slave 主从复制 以及 刷盘机制
抽象
- GroupCommitRequest: 如果是 半同步, 即MASTER_SYNC 模式, 会对写入请求封装成 GroupCommitRequest 触发同步
- HAService$GroupTransferService: 用来检查 GroupCommitRequest 是否同步成功/超时. 同步操作依赖底层统一的数据同步实现
- HAService$HAClient: slave用来创建和master的连接, 上报 offset 和 获取写入数据到 commitlog
- HAService$AcceptSocketService: 用来接收slave创建的连接, linux平台使用epoll, 只监听accept事件
- HAConnection: master上表示slave创建的一个连接
- HAConnection$ReadSocketService: 专门负责读取 slave 提交的ackOffset 的线程, 只监听read事件
- HAConnection$WriteSocketService: 负责发送同步数据, 也会在数据包中包含心跳的头(间歇), 只监听write事件
replicaRequest 流程
master 因为后面的版本支持了 future 模式, 因此 入口有两个, 分别对应 CommitLog#putMessage 和 CommitLog#asyncPutMessage, 最终都会进入 CommitLog#submitReplicaRequest
. replicaRequest 提交并不会触发任何同步行为, 因为同步本身是异步线程进行的, 提交的 replicaRequest 仅仅用来 检测 slave同步是否成功/超时, 相当于在另一个线程排队 (消息是顺序写入commitlog的, 因为replicaRequest 自带了顺序特性).
replicaRequest提交处理流程:
CommitLog#submitReplicaRequest -> HAService#putRequest -> HAService$GroupTransferService#run -> #doWaitTransfer -> GroupCommitRequest#wakeupCustomer 用于通知commitlog 是否已经同步完成
判断是否已经同步给slave, 只是简单判断了下 HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset()
, 也就是说 只要有一个slave 同步了消息, 同步就成功了.
数据同步流程
master 写入流程, master 会启动一个 AcceptSocketService 接受客户端的连接, 每一个客户端slave连接用 HAConnection 对象表示, 对应读写分别启动 WriteSocketService 和 ReadSocketService 两个对象.
master ReadSocketService
ReadSocketService 用来读取 slave 提交的 ackOffset, 用来同步的时候读取数据的offset.
流程如下:
AcceptSocketService#run -> HAConnection#run -> HAConnection$ReadSocketService#run -> #processReadEvent -> HAService#notifyTransferSome -> HAService$GroupTransferService#notifyTransferSome
可以发现, 读线程的最终效果除了修改 同步的offset, 还会通知堵塞的 GroupTransferService 有数据同步完成, 及时的返回结果给 写入请求.
master WriteSocketService
master 用来提交发送心跳以及 数据 给slave. 比较简单, 不赘述
但是需要注意的是, master 是主动推送给slave的, 和 kafka 设计中 slave 主动拉取不一样. master 推送之前, slave会周期性的上报自己的offset, 这样master 就知道从哪个位置开始推送.
slave
slave 会启动一个HAClient 连接到 master, 上报自己的ackOffset, 并不断读取 master 推送的数据.
slave 读取流程如下
HAClient#run -> #processReadEvent -> #dispatchReadRequest -> defaultMessageStore#appendToCommitLog
切主
rocketmq 不支持自动切主, 因此需要人工参与. 可以参考 https://github.com/didi/DDMQ/wiki/%E8%87%AA%E5%8A%A8%E4%B8%BB%E4%BB%8E%E5%88%87%E6%8D%A2
flush 机制
rocketmq 支持 同步刷盘和异步刷盘两种模式, 同步刷盘一定程度上降低了 rocketmq 的处理能力.
同步/异步 刷盘
抽象
- GroupCommitService: 同步刷盘策略, 每次写入数据都会触发磁盘刷盘
- FlushRealTimeService: 近实时刷盘策略, 数据页数量或者刷新时间到, 会触发刷新
同步实现
因为上层抽象了 future模式, 导致写入有两个入口, 对应的 刷盘也有两个入口: CommitLog#submitFlushRequest
和 CommitLog#handleDiskFlush
.
无论同步异步还是同步, 进入同步刷盘的逻辑流程如下:
GroupCommitService#putRequest -> MappedFileQueue#flush( -> MappedFile#flush) + GroupCommitRequest#wakeupCustomer (发回响应)
文件的第一次刷新需要刷新元数据. 使用双buffer进行读写, 每次读完buffer, 会刷新一次checkpoint.
异步刷盘
FlushRealTimeService 触发刷新的条件有两个:
- 刷新间隔超过配置的
flushCommitLogThoroughInterval
, 默认 10s, 会触发无脑刷新 - 刷新的page数量超过配置的
flushCommitLogLeastPages
, 默认是 4
近实时刷新每次刷新的时候都会更新checkpoint. 刷新的时候支持一个参数配置: flushCommitLogTimed
, 按时间刷新, 就是等待指定的 flushIntervalCommitLog
才进行刷新, 默认false, 就是有写入请求触发才进行刷新. 注意, 这个只是触发条件. 真正刷新需要传递 flushLeastPages
参数的, 0是无脑刷新, 有值 值需要满足 待刷新page数 符合需求才会刷新. 因此 在满足 flushCommitLogThoroughInterval
的情况下, flushLeastPages 就是传0