记录学习 mit 6.824 课程的经历

MapReduce 1

目标:

将任务拆解成map 和 reduce 两个阶段, 进行 大规模的数据处理, 比如 页面爬取、词频统计

模型

mapreduce

如上图, map/reduce 架构会将用户输入切成若干份数据输入(map的个数), 由map进行处理, 按照论文的说法, map读取文件是本地读取操作, map计算后得到的结果, 会按照 hash到若干份(reduce个数)本地文件存储, 并将存储位置上报给 master, master启动reduce worker, reduce worker会远程获取 每个map机器上的文件, 本地计算后输出到 gfs(分布式文件存储)上. 为了更好的性能和效果, 在map输出后以及reduce输入前, 会有一个combiner任务, 对map的结果进行预处理, 减少网络传输, 但是和reduce不同, combiner 的输出结果是存储在磁盘上的.

分布式场景下, 容易出现一些坏的机器导致map/reduce 执行慢, 对此, map/reduce 架构会重新执行任务. 对于执行完宕机的场景, map会触发重新执行(结果存放在本地), reduce 不需要重新执行(结果存放在gfs)

map/reduce的场景中, 需要处理 热点倾斜的问题, 因为会出现大量数据集中在一台reduce机器上, 对于这种问题, 需要自定义良好的 partition 函数, 将数据尽可能的平均打散

hadoop

架构

将论文中 partition/combiner 的抽象成 shuffle, 进行分区、排序、分割, 将属于同一划分(分区)的输出合并在一起并写在磁盘上,最终得到一个分区有序的文件.

问题

  1. 难以实时计算(处理存储在磁盘上的离线数据)
  2. 不能流式计算(处理的数据源是静态的文件, 流式计算处理的数据是动态的, storm/spark)
  3. 难以 DAG(任务输入和输出依赖的有向无环图, mapreduce的计算结果都会写入磁盘, 会造成大量频繁io, 性能低下, spark)

GFS 2

架构

arch

根据论文的描述, 核心组件是 master 和 chunkserver, master 负责管理chunk replica(创建、替换、复制、负载均衡, 通过chunk version number区分最新的chunk)、chunk 选主(lease机制)、GC(通过心跳进行懒删除文件)、文件管理(目录的形式组织文件), chunkserver 负责存储 chunk, chunk是存储的最小单位, 一个文件会被划分成多个chunk, chunk默认是 64MB.

master的数据特点: 通过operation log 记录所有请求操作, 并定期将内存状态存储到checkpoint, 加快恢复时间. chunkserver的数据特点: chunk内部会划分成 64KB 的 block, 每个block做checksum校验, 避免损坏的数据.

在高可用方面, master 也是多副本的, 不过以standby的形式存在, 当master挂了, 通过DNS切换通知客户端. chunkserver本身的多副本机制确保了高可用.

需要注意的是, master 持久化的元数据并不包括 chunk->chunkservers 的映射, 这个是通过master-chunkserver 之间的心跳上报实现的. (rocketmq中topic->brokers的映射也是这样, kafka则将这些信息维护在了zk上)

写入

write

写入比较特殊, 根据论文, 写入其实是个两阶段提交, 如图所示, 首先, 客户端从master获取到元数据(primary+secondary replica)并缓存起来, 然后客户端向所有的chunkserver发送写请求, chunkserver 将请求写入到自己的LRU缓存(注意这里没落盘), 确保全部写入成功后(失败会重试), client 请求primary提交数据, primary 会分配一个连续的序列号, 然后将将之前的修改存储起来, 然后primary请求所有secondary进行落盘, 如果部分replica落盘失败, primary返回错误给client, client会重试保证最终成功避免数据的不一致.

但是我比较疑惑的是, 写入是指写入到指定位置(offset) 还是指定的chunk 吗? append 的语意比较清晰, 就是追加到文件末尾.

append语意比write略微复杂, append 需要保证写入的chunk有足够的空间容纳数据, 否则会对当前chunk进行补齐(primary会同步secondary进行同样的操作), 然后client重试下一个chunk.

gfs并没有支持更新.

需要注意的时候, 数据的一致性完全依赖客户端的重试. 这样选主之后, 即使primary不是最新的, 也可以通过重试补齐. 有些暴力, 因为很多业务并不需要全部成功, 可以容忍失败, 希望尽快的返回错误, 因此现在基本上使用 raft 的方式同步, 但是相比retry, 复杂了些, 而且raft 至少需要三副本.

hdfs

架构

开源有hdfs, hdfs存在以下问题:

  1. 不适合低延迟数据访问, hdfs 的设计目标是高吞吐量
  2. 无法存储大量小文件 (参考facebook hystack)
  3. 不支持多用户写入和随机修改 (hdfs的一个文件只有一个写入者, 并且是追加写)

hbase

新版cfs

google 对gfs做了一次升级, 降低master单点故障, 但是思路很神奇, 最终是将元数据存储在chubby+bigtable上. 更多参考

primary-backup 3

根据论文内容, 这个思路是基于虚拟机的, 通过底层 hypervisor进行数据的同步, 如下:

backup

primary会通过 log channel 将数据同步给 backup, backup receive之后primary才会响应给client, 这里存在几个问题:

  1. logchannel 同步带宽
  2. backup receive 速度, 太慢影响primary的处理响应, 太快backup来不及处理导致lag增大(这样backup成为leader后, 需要先消费完这些数据才能成为leader, 灾备时间被延迟了)
  3. 非确定性的事件太多, 因为是机器级别的复制, 存在计时等干扰的非确定性因素同步
  4. 引入了 共享磁盘/非共享磁盘的 复杂度. 磁盘用来脑裂
  5. 存在重复输出和丢失输出(告诉客户端存在, 实际上backup并没与同步到)

参考

raft 4

以可理解性为目标, 解耦了 leader election、log replication、safety 三个方面, 如下:

  • strong leader: 写从leader流向 follower
  • leader election: 随机timer
  • membership change: joint consensus/ 联合一致性

基于 有序的replicated log 的复制状态机(replicated state machine). 一致性模块 确保 replicated log的一致性. 安全(非拜占庭问题下)、可用、不依赖时间. 更多的参考之前的学习: paper

在实践上, 建议参考 5

进一步阅读: Oki and Liskov’s Viewstamped Replication [29, 22] chubby spanner

zookeeper 6

目标: 提供构建复杂协调原语的简单、高性能的内核

这里的论文6 只是讲述了 zookeeper基于zab构建的协调服务. 对外提供的语意: lock,read/write lock, double barier, leader election, group membership, configuraiton management, zookeeper 抽象了znode存储数据(最大1MB), 参照 文件系统的 层级结构 组织znode. 并提供了 Regular 和 Ephemeral 两种 znode. 文章中重点讲述了 zookeeper 对外的 client api: create/delete/exists/get/getdata/setdata/getChildren/sync, 其中, getData、exits、getChildren 提供了 watch 模式, 用于当数据变更的时候进行通知, 但是需要注意的是, 数据获取的这几种方式并不是连接leader获取最新的数据, 默认是获取任意server本地的数据, 这样虽然提供了并发性能, 但是存在”stale data“/过期数据的问题, 支持如果想获取最新数据, zookeeper 提供了 sync 的api, 在调用read之前调用sync, sync会保证将之前pending write执行完, 这样再read就是最新的数据了. 按照论文的说法, sync+leader 应该是在leader上的, 根据读操作 FIFO 的特性, sync就不需要 zab原子广播, 只需要本地前面的操作同步完就可以了, 但是需要确保leader还持有租约, zookeeper 的解决方案比较直接, 通过前一个zab广播协议进行leader确认, 如果不是leader的话广播就会失败, 这里存在一个特别点, 如果sync+read之前有写操作, 直接依赖之前的写操作的广播协议, 如果没有写操作, 则需要一个 null transaction的原子广播执行.

这里谈到了读操作, 其实zookeeper提供的操作是 线性写、FIFO read, 线性写必须是通过leader执行, 通过zab原子广播到其他节点, 因此节点越多写操作越慢. 在内部存储上, zookeeper 用内存存储状态, replay log(wal) 存储提交的操作, 并且周期性的生成快照. 在快照这一块, zookeeper 并没有使用锁, 而是深度遍历生成快照, 也就意味着快照内数据的时间点是不一致的, 但是因为数据的状态改变是幂等的, 因此可以重复apply.

和之前的论文不同的地方是, 这篇论文还重点举例了 zookeeper的几个应用: fetching service、indexer service、message broker.

奇怪的是, 这篇文章并没有讲述 zab 协议的具体细节.

异步 线性 zookeeper

https://www.cnblogs.com/yeyang/p/11420920.html chubby 有论文吗?

其他参考阅读(TODO):

CRAQ7

架构参考:

chain-replica

全称是: Chain Replication with Apportioned Queries: 分摊查询的级联复制, 相比CR, 通过分摊查询压力到其他节点提升了整个架构的读吞吐量. 传统的CR模型, head负责write, tail负责read, 写入从head不断下沉到tail, tail 是整个commit的保证, tail commit之后逐级反馈到head, 然后head发送响应到client. 因为tail 是最终commit的保证, 因此从tail读取肯定是已经提交的最新的数据. 那么 CRAQ 模型将读取压力分摊到其他节点, 怎么保证读取的是最新的数据呢? 根据初始的CQ模型, write是级联下沉的, 当节点收到write但是还没收到commit, 那么就会标记数据为 dirty, 当client read定向到这个节点, 这个节点会请求 tail 节点获取最新的数据, 然后相应给客户端; 如果被请求的节点没有标记dirty, 那么节点直接返回数据. 这样就避免了stale read.

适配全球化进程, craq 还支持了 多数据中心、多Chain, 但是和raft不一样, 本身不解决协调问题, 而是依赖zk解决 group membership(成员管理)、元数据存储、以及节点变更通知. 但是论文并没有讨论相关细节. 论文中还谈到了 广播协议, 避免逐级传播的网络开销, 加快write和commit流程. 比较遗憾的是, 论文中提到 大对象的原子更新用小事务完成, 不过没有完成. 在节点变更上, 略微有些复杂, 甚至有些地方不是很理解, 实现的话需要认真思考下

在功能上, 除了 对象存储常规的 read/write, 还支持 prepend/append、incre/decr、test-and.

在压测结果上, 整个效果还是很理想的.

AURORA 8

参考论文中的良心架构:

aurora

aurora 是 amazon 提供的 OLAP关系型数据库, 在mysql 5.6 分支上改造, 将存储层和计算层进行了分离. aurora 重点在于 the log is the database, 通过同步虚拟分段的redo log 进行存储层之间的数据同步, 减少mysql mirror模型下大量的数据同步, 需要注意的是, aurora 用的是6路复制(3个az, 每个az两个副本). 在一致性层面, 通过quorum达成了最终一致性, 并通过 gossip 进行数据的补齐. 本质上是个 一写多读 的方案. 比较特别的是, 还用了s3做备份

在架构上, 计算层是 Aurora MySQL/instance, 存储层是 Storage Node. 在工程实践上, 计算层只是简单的 fork 了 mysql源码并简单了修改.

分层的设计中, instance(database tire)负责: query processor, transactions, locking, buffer cache, access methods and undo management; storage node负责: redo logging, durable storage, crash recovery, and backup/restore. 也就是将存储层进行了沉降.

写入流程: storage node将接受到的 redo log record(这个是个啥?)添加到内存队列, 持久化到磁盘上后ack. 在apply的时候, 会检查是否存在lag, 如果存在lag, 则通过gossip协议进行补齐. 最终将记录放到新的数据页, 并周期性同步到s3, 并周期性的 gc老版本 和 校验 crc. 这里只有内存队列排队和ack 两个步骤是同步的, 其他是异步的, 所以性能会有所提升. 可以明显的发现, 这里调用写入和写入page其实是两个分离的行为, 那么是否存在着 客户端写入被ack了, 但是读取的时候可能读取到的是 stale data/过期的数据.

读流程: 根据分层设计, instance(database layer) 负责 buffer cache, 因此读取大部分情况下是读取 database layer, 只有当数据页找不到的时候, 才会产生 storage IO 请求. 因此缓存数据, 所以存在空间不够需要驱逐的场景, 传统数据库是刷新dirty page, 但是在 aurora, 则是刷新 PAGE LSN 大于vdl的page, 因为读取都是读取当前vdl的数据, 因此大于vdl的数据则是没有意义的. 问题, 如果都是小于等于vdl的, 怎么驱逐呢? 在read的时候, 使用 vdl 取代了读取快照作为 read point, 而不是通过read quorom来达成一致, database layer 是知道每个 segment 的 scl, 但是需要处理 segment 不能给出大于 read point的数据, 除此之外, database 可以计算出 Protection Group Min Read Point LSN(PGMRPL), 也就是read 请求的 read point 不可能小于 PGMRPL, 通过这个点, storage node 可以安全的回收

关于复制, 在传统的镜像复制/mysql mirror中, Primary/Backup Instance 之间复制需要 Log、Binlog、Data、FRMFiles(元数据), aurora 只需要同步 redo log 就可以了.

分段的设计中, database volumn 被分成 固定大小的(10GB)的 segment, 通过复制构成 pg(protection groups), 通过分段减少 mttr(平均恢复时间), 因为 mttf 是不可控制的.

更艳遇事务, 多个事务之间的干扰处理 以及 事务处理没怎么说. 但是只有一个 writer, 模型应该相对简单

对于恢复的处理, 常规的wal+快照, 同时也有undo log回滚未提交的日志. 也会增加一个 epoch 避免崩溃恢复被打断. 除此之外, instance(database layer) 需要重新构建运行时状态, 比如数据缓冲和一些元数据(vdl). 对于undo, aurora因为设计在database层, 所以 database 进行执行.

几个概念:

  • LSN: 日志序列号, 每个日志记录的唯一ID
  • SCL/Segment Complete LSN: segment PG已经收到的最大的LSN. 用来gossip阶段数据补齐的. 每个storage node 不一样. 读取的时候, 需要选择 scl > read point 的.
  • VCL/Volumn Complete LSN: 存储层上之前的日志记录的"可用”, 在恢复期间, 大于VCL的日志记录都会被截断. 在恢复期间, storage server 上 已经被 quorum reads 存储的最大的LSN. 可能还未提交. Storage layer的概念
  • CPL/Cosistemcy Point LSN: database layer 的概念. 每个mtrs(mini transaction, 每个database layer的transaction会被分成多个mini transaction)产生的最后一个log record的LSN.
  • VDL/Volumn Durable LSN: database layer的概念. 最大的CPL. 恢复阶段通过read quorum确定. 小于 VCL. write流程中, database layer 收到 storage layer的ack的时候, 就会增长

恢复的时候, database 和 storage沟通 创建每个pg的 durable point来创建 vdl.

几个问题:

  • mysql mirror 的模型: 暂时没找到资料, 论文中说 需要同步binlog redolog data 和 double write, 按照之前的经验, 我们使用的binlog同步复制策略并没与这么多数据需要同步. 不是很理解
  • redo log: 记录页的变更记录, innodb 的wal. 因此 ddl 变更 对应的redo log也是很大的
  • 看样子底层存储使用的是 amazon block store,这个不就有一致性能力吗? 为什么还需要复制?
  • 事务的处理: ? 仅仅说了在 database layer将事务拆分成多个 mtrs, 最终commit
  • 事务隔离级别的保证: 支持可重复读
  • quorum需要保证写入失败的场景下, 通过gossip不会写入所有副本, 不然冗余数据被提交了呀.
  • storage tier 负责生成数据页, 那么有个问题, database tire/instance 没有数据有怎么生成 redo log 呢? 因此是 database layer 会从 storage node 捞取数据上去, 并且只有在 vdl
  • 读取空间不足, 如果都是 page lsn <= vdl, evit alg 怎么实践?
  • 存在写入成功后, 不一定立即能够读取到, 因为ack 是先写入wal的, storage node可能还没有处理, 怎么解决的?

更多参考:

Frangipani 9

参考:


  1. 作者: Jeffrey Dean, Sanjay Ghemawat: MapReduce: Simplified Data Processing on Large Clusters ↩︎

  2. 作者: Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung: The Google File System ↩︎

  3. 作者: Daniel J. Scales, Mike Nelson, and Ganesh Venkitachalam: The Design of a Practical System for Fault-Tolerant Virtual Machines ↩︎

  4. 作者: Diego Ongaro, John Ousterhout: In Search of an Understandable Consensus Algorithm (Extended Version) ↩︎

  5. 作者: Diego Ongar: CONSENSUS: BRIDGING THEORY AND PRACTICE ↩︎

  6. 作者: Patrick Hunt, Mahadev Konar, Flavio P. Junqueira, Benjamin Reed: ZooKeeper: Wait-free coordination for Internet-scale systems ↩︎

  7. 作者: Jeff Terrace, Michael J. Freedman:Object Storage on CRAQ High-throughput chain replication for read-mostly workloads ↩︎

  8. 作者: Amazon Web Services: Amazon Aurora: Design Considerations for High Throughput Cloud-Native Relational Databases ↩︎

  9. 作者: Chandramohan A. Thekkath, Timothy Mann, Edward K. Lee: Frangipani: A Scalable Distributed File System ↩︎