3 minutes
Rocketmq_dledger
Preface
RocketMQ 去年实现了基于raft协议的 commitlog 存储库, 解决master-slave架构下 人工选主、切主 的故障转移的运维负担, 以及故障转移过程中数据丢失的问题.
raft
写入
raft 协议的写入如下:
1. client -> leader: 客户端请求leader写入kv
2. leader append 本地日志(commitlog)
3. leader 并行发送日志给 follower
4. follower收到日志, 写入本地 commit log, 并 apply 本地的 FSM, 返回成功给 leader
5. leader收到follower超过半数以上的成功响应, 本地apply 日志到 FSM.
读取
raft 协议的读取如下:
1. client -> leader: 客户端请求leader写入kv
2. leader 通过 lease 检查自己是否是 leader,
3. 检查是leader的情况下, 检查本地 apply index 和 客户端的 index, apply index大的话, 读取本地的状态机的数据 + apply index返回
4. 不是leader的情况下, 就请求leader获取最新的 apply index, 和 客户单的index 比较, apply index 大的话, 读取本地的状态机的数据 + apply index 返回
参考
- (官网)[https://raft.github.io/]
- (论文)[https://raft.github.io/raft.pdf]
- (可视化动图)[http://thesecretlivesofdata.com/raft/]
- (raft大规模使用)[https://zhuanlan.zhihu.com/p/23872141]
dledger commitlog
dledger logic
消息系统应用raft 协议, 需要考虑到一个问题, 就是 RMQ 的commitlog 本身就是系统的最终状态, 这个和 raft 协议的 状态机重叠, 因此, 可以使用 commitlog 替代 raft中的状态机实现. 对于master身份的标志, 也是基于raft 的 leader-follower-condidate 实现, 故障转移也是基于 raft的自动选主
- 复制模型
异步复制 并发复制
- 可靠性
对于网络分区部分节点term非常大但是日志量很低的场景, 添加了两个阶段: WAIT_TO_REVOTE 和 WAIT_TO_VOTE_NEXT, 投票的时候, 先进入 WAIT_TO_REVOTE, 这个阶段 term 不会递增, 只有有效投票数超过半数, 节点状态才会进入 WAIT_TO_VOTE_NEXT, 这个时候term 才会增加. 并且通过了构建的jepsen测试
dledger impl
存储
DLedgerStore 提供了内存和文件持久化两种方式, 其中, 我们需要理解的几个概念:
- DLedgerEndTerm:
- DLedgerBeginIndex: 第一个数据文件的ledgerIndex
- DLedgerEndIndex: 最新添加的日志的index, 只有leader才能递增
- committedIndex: 提交的index
- MemberState: currTerm voteLeader
MemberState存储一下参数:
- ledgerEndIndex
- ledgerEndTerm
内存的实现比较简单, 没有复杂的逻辑管理, 只是简单的添加. DLedgerMmapFileStore 通过mmap实现文件存储, 分成 数据文件 和索引文件, 因为数据文件的数据大小是变化的, 因此需要索引文件进行索引, 方便ledgerIndex 进行索引. 因为 commitlog 的特殊性, 需要定期进行刷新和删除旧文件. 刷新也借鉴 commitlog的设计, 做了一个checkpoint. 删除服务分两个维度操作, 一个是 storeDir 的磁盘占比, 另一个是 data 目录的磁盘占比, 前者包括了索引的磁盘占比。
灾备: 借鉴了rmq的设计, 先load完整的文件, 在recover: 也是从倒数三个文件检查, 从后向前找正确的文件(check文件的头数据和相应索引的数据), 然后遍历数据文件的每一个数据, 数据检查有两种选择的方式: needWriteIndex. 在recover的时候是否写入索引, 不写入索引的时候, 就检查数据内容和索引.
文件名就是数据的绝对偏移. 数据文件内容的格式:
magic(int)
all data size(int)
entryIndex(long)
entryTerm(long)
pos(long) // 数据的绝对偏移
channel(int)
chain crc(int)
body crc(int)
索引文件的内容格式:
magic(int)
pos(long) // 数据的绝对偏移
data size(int)
entry index(long)
entry term(long)
append
leader处理请求的流程如下:
DLedgerServer#handleAppend -> DLedgerMmapFileStore/DLedgerMemoryStore#appendAsLeader + DLedgerEntryPusher#waitAck -> DLedgerEntryPusher$EntryDispatcher(分发请求吧和处理响应, 一个follower一个) + DLedgerEntryPusher$QuorumAckChecker(committed offst移动)
EntryDispatcher 的流程如下:
DLedgerEntryPusher$EntryDispatcher: getPeerWaterMark? -> DLedgerRpcService.push: updatePeerWaterMark + quorumAckChecker(更新term peerId ack的点位, 更新 quorm ack, 推进 quorum commited index, 将ack的内容请求响应送回去)
可以发现, 主要由以下几个模块:
- DLedgerEntryPusher$EntryDispatcher: 负责rpc发送请求 到follower. 每个follower一个 EntryDispatcher
- DLedgerEntryPusher$QuorumAckChecker: 负责follower ack 的记录以及 leader committed index 的推进 (需要半数ack)
follower 对于append请求如下:
DLedgerRpcNettyService#processRequest-> DLedgerRpcNettyService#handlePush->DLedgerServer#handlePush->DLedgerEntryPusher$EntryHandler#handlePush-> 放到map, 异步处理 -> DLedgerEntryPusher$EntryHandler$#dowork (单独的线程) -> #handleDoAppend -> dLedgerStore.appendAsFollower + dLedgerStore#updateCommittedIndex
上面只是专门写了 单个请求, 其实还是支持 batch 处理的.
EntryHandler#dowork 还有特殊情况的处理
对于写文件, DLedgerMmapFileStore#appendAsLeader 会同时写入数据文件和索引文件, 同时更新 ledgerBeginIndex = ledgerEndIndex; 对于follower, 调用 DLedgerMmapFileStore#appendAsFollower, 也是写入数据文件和索引文件, 但是数据文件的元数据直接使用, 不需要像master一样进行更新
get
调用路径:
DLedgerRpcNettyService#processRequest -> #handleGet -> DLedgerServer#handleGet -> DLedgerMmapFileStore#get: 从索引文件获取pos和size, 再从数据文件读取
特殊情况: truncate
当新的leader当选的时候, 会要求 follower truncate. 整理逻辑如代码注释:
* The push has 4 types:
* APPEND : append the entries to the follower
* COMPARE : if the leader changes, the new leader should compare its entries to follower's
* TRUNCATE : if the leader finished comparing by an index, the leader will send a request to truncate the follower's ledger
* COMMIT: usually, the leader will attach the committed index with the APPEND request, but if the append requests are few and scattered,
* the leader will send a pure request to inform the follower of committed index.
*
* The common transferring between these types are as following:
*
* COMPARE ---- TRUNCATE ---- APPEND ---- COMMIT
* ^ |
* |---<-----<------<-------<----|
leader路径:
DLedgerEntryPusher$EntryDispatcher#doWork -> #doCompare ->#changeState + #doTruncate( -> #changeState)
DLedgerEntryPusher$EntryDispatcher 的初始状态就是 COMPARE, 因此 dowork 会先进行 compare 操作, 给follower发送compare 操作, 对于有冗余数据的follower, 需要发送truncate请求, 才能进入 append状态, 否则直接进入 append 状态.
存在问题, 没看到 follower 同步leader 老数据的流程 … ?
follower
DLedgerEntryPusher$EntryHandler#handlePush -> #doWork -> DLedgerEntryPusher$EntryHandler#handleDoTruncate -> DLedgerMmapFileStore#truncate
pull 没有实现pull请求. DLedgerRpcNettyService#pull 没有调用方
DLedgerRpcNettyService#processRequest -> #handlePull -> DledgerServer#handlePull
选主
主要的逻辑在 DLedgerLeaderElector 中, 心跳处理、状态管理(StateMaintainer)、投票. 对term 比较大的请求, 都是退化成candidate重新处理.
有一个leadership transfer 逻辑很奇怪, 发起者貌似在cmd client.
DLedgerRpcNettyService#handleLeadershipTransfer-> DLedgerServer#handleLeadershipTransfer -> DLedgerLeaderElector#handleLeadershipTransfer/handleTakeLeadership -> DLedgerRpcNettyService#leadershipTransfer -> DLedgerRpcNettyService#leadershipTransfer
亮点
- ResettableCountDownLatch: 支持重置, AbstractQueuedSynchronizer 提供了接口, 这里进行了扩展
- cmdline: 支持了命令行
- MmapFileList 的抽象, 简化了底层的目录的封装
dledger commitlog impl
基于 Dledger 的commitlog实现位于: org.apache.rocketmq.store.dledger.DLedgerCommitLog . 为了简化实现, 直接继承了 org.apache.rocketmq.store.CommitLog.
为了兼容老版本进行升级, 使用了一个特殊的字段: dividedCommitlogOffset, 是以 DledgerFilterStore 的第一个文件的起始位置 计算出来的, 小于这个offset, 就是老式的 mappedFileQueue 存储方式, 否则就是新版.
dledger based commitlog 的实现比较简单, 是直接依赖 完善的 dledger 实现, 读写和recover. 这里对几个点说明下:
- 日志格式
和老版本一致
- 读写
写入新版本走的是 raft 协议, 由 Dledger 实现. 老版本经过主从复制实现 (半同步复制和异步复制)
读取新版本走的是 raft 协议, 由 Dledger 实现, 会判断下 offset 是否是已经提交, 只读取已经提交的内容. 老版本直接读取.
- 选主
无感知
- 日志复制
新版本走的是raft 协议, 由 Dledger 实现. 老版本走的半同步/异步复制
参考
https://juejin.im/post/5d53bbf76fb9a06b1417dc16 https://raft.github.io/raft.pdf https://web.stanford.edu/~ouster/cgi-bin/papers/OngaroPhD.pdf