Preface

RocketMQ 去年实现了基于raft协议的 commitlog 存储库, 解决master-slave架构下 人工选主、切主 的故障转移的运维负担, 以及故障转移过程中数据丢失的问题.

raft

写入

raft 协议的写入如下:

    1. client -> leader: 客户端请求leader写入kv
    2. leader append 本地日志(commitlog)
    3. leader 并行发送日志给 follower 
    4. follower收到日志, 写入本地 commit log, 并 apply 本地的 FSM, 返回成功给 leader  
    5. leader收到follower超过半数以上的成功响应, 本地apply 日志到 FSM.  

读取

raft 协议的读取如下:

    1. client -> leader: 客户端请求leader写入kv 
    2. leader 通过 lease 检查自己是否是 leader,
    3. 检查是leader的情况下, 检查本地 apply index 和 客户端的 index, apply index大的话, 读取本地的状态机的数据 + apply index返回
    4. 不是leader的情况下, 就请求leader获取最新的 apply index, 和 客户单的index 比较,  apply index 大的话, 读取本地的状态机的数据 + apply index 返回

参考

  • (官网)[https://raft.github.io/]
  • (论文)[https://raft.github.io/raft.pdf]
  • (可视化动图)[http://thesecretlivesofdata.com/raft/]
  • (raft大规模使用)[https://zhuanlan.zhihu.com/p/23872141]

dledger commitlog

dledger logic

消息系统应用raft 协议, 需要考虑到一个问题, 就是 RMQ 的commitlog 本身就是系统的最终状态, 这个和 raft 协议的 状态机重叠, 因此, 可以使用 commitlog 替代 raft中的状态机实现. 对于master身份的标志, 也是基于raft 的 leader-follower-condidate 实现, 故障转移也是基于 raft的自动选主

  1. 复制模型

异步复制 并发复制

  1. 可靠性

对于网络分区部分节点term非常大但是日志量很低的场景, 添加了两个阶段: WAIT_TO_REVOTE 和 WAIT_TO_VOTE_NEXT, 投票的时候, 先进入 WAIT_TO_REVOTE, 这个阶段 term 不会递增, 只有有效投票数超过半数, 节点状态才会进入 WAIT_TO_VOTE_NEXT, 这个时候term 才会增加. 并且通过了构建的jepsen测试

dledger impl

存储

DLedgerStore 提供了内存和文件持久化两种方式, 其中, 我们需要理解的几个概念:

  • DLedgerEndTerm:
  • DLedgerBeginIndex: 第一个数据文件的ledgerIndex
  • DLedgerEndIndex: 最新添加的日志的index, 只有leader才能递增
  • committedIndex: 提交的index
  • MemberState: currTerm voteLeader

MemberState存储一下参数:

  • ledgerEndIndex
  • ledgerEndTerm

内存的实现比较简单, 没有复杂的逻辑管理, 只是简单的添加. DLedgerMmapFileStore 通过mmap实现文件存储, 分成 数据文件 和索引文件, 因为数据文件的数据大小是变化的, 因此需要索引文件进行索引, 方便ledgerIndex 进行索引. 因为 commitlog 的特殊性, 需要定期进行刷新和删除旧文件. 刷新也借鉴 commitlog的设计, 做了一个checkpoint. 删除服务分两个维度操作, 一个是 storeDir 的磁盘占比, 另一个是 data 目录的磁盘占比, 前者包括了索引的磁盘占比。

灾备: 借鉴了rmq的设计, 先load完整的文件, 在recover: 也是从倒数三个文件检查, 从后向前找正确的文件(check文件的头数据和相应索引的数据), 然后遍历数据文件的每一个数据, 数据检查有两种选择的方式: needWriteIndex. 在recover的时候是否写入索引, 不写入索引的时候, 就检查数据内容和索引.

文件名就是数据的绝对偏移. 数据文件内容的格式:

magic(int)
all data size(int)
entryIndex(long)
entryTerm(long)
pos(long)  // 数据的绝对偏移
channel(int)
chain crc(int)
body crc(int)

索引文件的内容格式:

magic(int)
pos(long) // 数据的绝对偏移
data size(int)
entry index(long)
entry term(long)

append

leader处理请求的流程如下:

DLedgerServer#handleAppend -> DLedgerMmapFileStore/DLedgerMemoryStore#appendAsLeader + DLedgerEntryPusher#waitAck -> DLedgerEntryPusher$EntryDispatcher(分发请求吧和处理响应, 一个follower一个) + DLedgerEntryPusher$QuorumAckChecker(committed offst移动)  

EntryDispatcher 的流程如下:

DLedgerEntryPusher$EntryDispatcher: getPeerWaterMark? -> DLedgerRpcService.push: updatePeerWaterMark + quorumAckChecker(更新term peerId ack的点位, 更新 quorm ack, 推进 quorum commited index, 将ack的内容请求响应送回去)

可以发现, 主要由以下几个模块:

  • DLedgerEntryPusher$EntryDispatcher: 负责rpc发送请求 到follower. 每个follower一个 EntryDispatcher
  • DLedgerEntryPusher$QuorumAckChecker: 负责follower ack 的记录以及 leader committed index 的推进 (需要半数ack)

follower 对于append请求如下:

DLedgerRpcNettyService#processRequest-> DLedgerRpcNettyService#handlePush->DLedgerServer#handlePush->DLedgerEntryPusher$EntryHandler#handlePush-> 放到map, 异步处理 -> DLedgerEntryPusher$EntryHandler$#dowork (单独的线程) -> #handleDoAppend ->  dLedgerStore.appendAsFollower + dLedgerStore#updateCommittedIndex    

上面只是专门写了 单个请求, 其实还是支持 batch 处理的.

EntryHandler#dowork 还有特殊情况的处理

对于写文件, DLedgerMmapFileStore#appendAsLeader 会同时写入数据文件和索引文件, 同时更新 ledgerBeginIndex = ledgerEndIndex; 对于follower, 调用 DLedgerMmapFileStore#appendAsFollower, 也是写入数据文件和索引文件, 但是数据文件的元数据直接使用, 不需要像master一样进行更新

get

调用路径:

DLedgerRpcNettyService#processRequest -> #handleGet -> DLedgerServer#handleGet -> DLedgerMmapFileStore#get: 从索引文件获取pos和size, 再从数据文件读取 

特殊情况: truncate

当新的leader当选的时候, 会要求 follower truncate. 整理逻辑如代码注释:

     * The push has 4 types:
     *   APPEND : append the entries to the follower
     *   COMPARE : if the leader changes, the new leader should compare its entries to follower's
     *   TRUNCATE : if the leader finished comparing by an index, the leader will send a request to truncate the follower's ledger
     *   COMMIT: usually, the leader will attach the committed index with the APPEND request, but if the append requests are few and scattered,
     *           the leader will send a pure request to inform the follower of committed index.
     *
     *   The common transferring between these types are as following:
     *
     *   COMPARE ---- TRUNCATE ---- APPEND ---- COMMIT
     *   ^                             |
     *   |---<-----<------<-------<----|

leader路径:

DLedgerEntryPusher$EntryDispatcher#doWork -> #doCompare ->#changeState + #doTruncate( -> #changeState) 

DLedgerEntryPusher$EntryDispatcher 的初始状态就是 COMPARE, 因此 dowork 会先进行 compare 操作, 给follower发送compare 操作, 对于有冗余数据的follower, 需要发送truncate请求, 才能进入 append状态, 否则直接进入 append 状态.

存在问题, 没看到 follower 同步leader 老数据的流程 … ?

follower

DLedgerEntryPusher$EntryHandler#handlePush -> #doWork -> DLedgerEntryPusher$EntryHandler#handleDoTruncate -> DLedgerMmapFileStore#truncate 

pull 没有实现pull请求. DLedgerRpcNettyService#pull 没有调用方

DLedgerRpcNettyService#processRequest -> #handlePull -> DledgerServer#handlePull

选主

主要的逻辑在 DLedgerLeaderElector 中, 心跳处理、状态管理(StateMaintainer)、投票. 对term 比较大的请求, 都是退化成candidate重新处理.

有一个leadership transfer 逻辑很奇怪, 发起者貌似在cmd client.

DLedgerRpcNettyService#handleLeadershipTransfer-> DLedgerServer#handleLeadershipTransfer -> DLedgerLeaderElector#handleLeadershipTransfer/handleTakeLeadership -> DLedgerRpcNettyService#leadershipTransfer ->  DLedgerRpcNettyService#leadershipTransfer

亮点

  • ResettableCountDownLatch: 支持重置, AbstractQueuedSynchronizer 提供了接口, 这里进行了扩展
  • cmdline: 支持了命令行
  • MmapFileList 的抽象, 简化了底层的目录的封装

dledger commitlog impl

基于 Dledger 的commitlog实现位于: org.apache.rocketmq.store.dledger.DLedgerCommitLog . 为了简化实现, 直接继承了 org.apache.rocketmq.store.CommitLog.

为了兼容老版本进行升级, 使用了一个特殊的字段: dividedCommitlogOffset, 是以 DledgerFilterStore 的第一个文件的起始位置 计算出来的, 小于这个offset, 就是老式的 mappedFileQueue 存储方式, 否则就是新版.

dledger based commitlog 的实现比较简单, 是直接依赖 完善的 dledger 实现, 读写和recover. 这里对几个点说明下:

  1. 日志格式

和老版本一致

  1. 读写

写入新版本走的是 raft 协议, 由 Dledger 实现. 老版本经过主从复制实现 (半同步复制和异步复制)

读取新版本走的是 raft 协议, 由 Dledger 实现, 会判断下 offset 是否是已经提交, 只读取已经提交的内容. 老版本直接读取.

  1. 选主

无感知

  1. 日志复制

新版本走的是raft 协议, 由 Dledger 实现. 老版本走的半同步/异步复制

参考

https://juejin.im/post/5d53bbf76fb9a06b1417dc16 https://raft.github.io/raft.pdf https://web.stanford.edu/~ouster/cgi-bin/papers/OngaroPhD.pdf