The Design of a Practical System for Fault-Tolerant Virtual Machines

阅读论文 Fault-Tolerant Virtual Machines,参考 FAQnote

概述

论文使用虚拟机、机器级别的主从复制(一主一从)和共享磁盘的方式设计容错系统,目前只支持单处理器的虚拟机。

有两种复制方式,状态转移复制和状态机复制。状态转移复制是将主节点的所有状态复制到从节点;而状态机复制要求节点是一个确定性状态机,不同节点从同一个状态以相同的顺序执行操作,可以得到相同的结果。

比较有意思的是,不像我所了解的常规数据复制,论文实现的系统使用机器级别的复制,状态包含 CPU、内存和 I/O 设备的状态,操作是 x86 指令。在机器级别上,状态转移复制的缺点是会将所有状态的更改通过网络进行传输,发送状态需要很多带宽;状态机复制使用更少的网络带宽,但是需要特殊处理非确定性操作(例如:获取时间、定时中断)来保证主从一致,这在物理服务器上很难实现,特别是当处理器频率增加时。

PS:时间是非确定的很好理解,但是定时中断为什么是非确定的,我的想法是虽然主从的状态是一致的(如果没有中断),但是执行速度也不是完全一致,所以中断的时机可能不同。

论文设计的系统使用状态机复制,利用虚拟机(VM)由虚拟机管理程序完全控制的特性,当主虚拟机执行非确定性操作时,虚拟机管理程序可以捕获必要的信息发送给从虚拟机,将非确定性操作转化为确定性操作,从而保证主从一致。只支持单处理器虚拟机,因为多处理器产生的并发操作是非确定性的,存在显著的性能问题。

主从虚拟机运行在不同的服务器上,从虚拟机和主虚拟机以相同的方式运行,并且总是有较小的延迟(小于 100 毫秒),课程讲义提到至少滞后一个日志项。只有主虚拟机在网络上发布其存在,输入(例如:网络、磁盘、键盘、鼠标)只会发送给主虚拟机,主虚拟机通过网络连接(称为日志通道,logging channel)将其转发给从虚拟机。同时,只有主虚拟机会产生输出,从虚拟机的输出将被虚拟机管理程序丢弃。

确定性重放

VMware FT 使用确定性重放,使从虚拟机能够通过重放日志达到和主虚拟机相同的状态。具体来说,主虚拟机的输入和非确定性操作都会被虚拟机管理程序捕获,然后生成日志(不会写入磁盘),发送给从虚拟机。对于非确定性操作,日志会记录必要的信息,保证主从一致。例如,对于中断操作,日志会记录操作发生时所完成的指令数量。论文提到该技术的实现有使用硬件性能计数器(HPC)。

特别注意,日志仅包含输入和非确定性操作相关的信息,确定性操作在从虚拟机的本地执行。具体来说,主虚拟机和从虚拟机都是状态机,会自动执行操作(由 VM 中的 Linux 和 App 发起,这也说明了为什么主从虚拟机的初始状态必须相同),只不过输入只会发送给主虚拟机,以及存在非确定性操作,所以输入和非确定性操作需要以日志的形式包含额外信息发送给从虚拟机。

容错协议

但是仅使用确定性重放还不够,需要额外的机制保证系统的容错性。如果主虚拟机在执行输出操作之后发生故障,而日志没有发送给从虚拟机,那么从虚拟机接管之后,在其输出之前的非确定性操作(未收到日志的)可能会得到不同的输出结果,从而产生不一致(不一致主要是针对客户端的感知而言)。

解决方案是在主虚拟机发送输出之前,向从虚拟机发送输出操作的日志并等待其确认,当从虚拟机接收到该输出操作及之前的所有日志之后,从虚拟机回复一个确认,然后主虚拟机可以向外部发送输出。注意,主虚拟机只是延迟发送输出,但是没有停止执行(即在等待输出的同时会继续执行其他操作,就像在某个线程等待磁盘 I/O 时会切换到其他线程一样)。该机制在主虚拟机发生故障时,可能会产生两次相同的输出,因为从虚拟机无法得知主虚拟机是否发送输出,但是论文随后提到 TCP 可以保证网络数据包的去重(我的理解是 TCP 是根据序列号去重的,由于主从虚拟机状态相同,所以会产生相同的序列号)。

故障检测和恢复

系统通过监控节点的心跳(使用 UDP),以及日志通道上的流量来判断节点是否发生故障(使用定时中断,保证流量不会停止)。

  • 从虚拟机故障:主虚拟机继续执行,但是停止发送日志。
  • 主虚拟机故障:从虚拟机重放日志以追赶主虚拟机,然后将从虚拟机提升为主虚拟机。VMware FT 会在网络上发布新的主虚拟机的 MAC 地址,以便交换机知道其位于哪个服务器上。
  • 网络分区故障:主虚拟机可能由于网络问题和从虚拟机中断连接,如果此时将从虚拟机提升为主虚拟机,将会导致脑裂。为避免该问题,容错协议要求在检测到故障时,主从虚拟机需要在共享磁盘上执行 test-and-set 原子操作。操作成功的虚拟机作为主虚拟机存活,操作失败的虚拟机会自行中止。

不论是启动系统时,还是故障恢复时,都需要保证存在一个从虚拟机。VMware FT 使用 FT VMotion 功能,将虚拟机复制到集群中的某个服务器上(根据资源使用情况和其他约束条件选择)。该功能会建立从源虚拟机到目标虚拟机的日志通道,并且将源虚拟机设置为主虚拟机(记录日志模式),目标虚拟机设置为从虚拟机(重放日志模式)。该功能仅会中断源虚拟机小于 1 秒的时间,在复制的过程中源虚拟机仍会正常执行,日志会被存放在缓冲区中。

总结

论文介绍容错虚拟机的实现,还提到磁盘 I/O 和网络问题及其解决方案,不同设计的决策以及对各个负载的性能测试,详情参见论文。PS:课程讲义很不错,可以加深对论文内容的理解,我的理解还是太浅。

The Google File System

阅读论文 GFS,参考 FAQnote

概念

GFS 是一个分布式文件系统,用于大型分布式数据密集型应用程序,例如 MapReduce。系统的设计基于以下场景:

  • 系统由许多机器组成,所以会频繁发生故障。
  • GB 级别的文件很多,普通文件系统会将文件划分为很多块,不便管理。
  • 读负载由大量顺序读和少量随机读组成,写负载由大量追加写和少量随机写组成。
  • 高持续带宽比低延迟更重要,应用程序需要快速进行批处理,而对响应时间没有严格要求。

实现

GFS 集群由单个 master 和多个 chunkserver 组成,由多个客户端访问。文件被划分为 64 MB 的块(chunk),每个块都是 chunkserver 中的一个文件。在分配空间时使用懒分配策略,避免产生内部碎片从而浪费空间。master 在创建块时,会为其分配一个不可变且全局唯一的 64 位块句柄(chunk handle)。每个块都会被复制到多个 chunkserver 上(默认三个),以保证系统的可靠性。

master 主要负责维护系统的元数据、存储日志和检查点、租约管理以及控制垃圾收集、重新复制、负载均衡和快照创建。元数据包括命名空间、访问控制信息、从文件名到块句柄数组的映射,以及为每个块维护版本号和副本所在 chunkserver 的列表。master 通过心跳消息定期与 chunkserver 通信,向其发出指令和收集其状态。chunkserver 主要负责存储文件数据、版本号和校验和,64 MB 的 chunk 被划分为 64 KB 的 block,每个 block 都有一个 32 位的校验和。

链接到应用程序的 GFS 客户端代码实现文件系统 API,通过与 master 和 chunkserver 通信来代表应用程序读写数据。客户端从 master 获取元数据,从 chunkserver 获取文件数据。客户端不需要缓存文件数据(但会缓存元数据),因为负载通常是顺序读和追加写。客户端没有实现 POSIX API,因此不需要挂钩到 Linux vnode 层。PS:我没有查到 Linux vnode 相关的资料,vnode 似乎是 BSD 中的概念,和 VFS 有关。

块大小

Linux 文件系统的默认块大小为 4 KB,GFS 使用 64 MB 的块大小是基于其 GB 级文件场景而设计的,优势如下:

  • 由于应用程序通常是顺序读写文件数据,所以 64 MB 的块大小可以减少客户端和 master 的交互次数。
  • 许多操作会发生在同一个块中,使得客户端和 chunkserver 保持长 TCP 连接,有利于减少网络开销。
  • master 中的元数据更少,从而可以全部放入内存,避免磁盘 I/O。

同时,论文提到小文件可能只有一个块,面对多个应用程序的访问,存储该块的 chunkserver 有可能成为热点。GFS 的解决方案是使用更高的复制因子存储小文件,同时使批处理队列系统错开应用程序访问小文件的时间。潜在的解决方案是允许客户端在该情况下从其他客户端读取数据。

元数据

命名空间和映射会以操作日志(operation log)的形式持久化到 master 的本地磁盘和复制到远程机器,保证 master 能够在崩溃之后恢复。为保证一致性,只有在本地和远程将相应的日志刷新到磁盘之后,master 才会响应客户端的操作。可以对刷新和复制批处理,从而减少开销。当日志超过一定大小时,会创建检查点(使用 B 树),从而避免崩溃恢复时重放所有日志。由于创建检查点比较耗时,master 会切换到新的日志文件,并在单独的线程中创建检查点,以避免在创建检查点时停止执行(写)操作。检查点同样也会被复制到远程机器。

master 不会持久化块的位置列表,而是在启动时以及 chunkserver 加入集群时,向 chunkserver 询问其包含的块。因为块是否存在于某个 chunkserver 是由该 chunkserver 决定的,所以在 master 中持久化该信息没有任何意义,反而会面临同步问题。PS:说明 master 内存中的位置列表可能因为 chunkserver 故障,从而产生不一致。

命名空间其实就是一个将目录名和文件名作为节点的树,通过使用读写锁保证写操作的正确性。具体来说,读/写操作会获取路径上所有祖先节点的读锁,以及目标节点的读/写锁。锁在层级间按照自顶向下的顺序获取,在层级内按照字典序获取,从而避免死锁。PS:这让我想到 B+ 树的蟹行协议只锁定会被修改的节点,大概是因为 B+ 树的数据和路径不像文件和路径那样具有很强的关联性。

读操作

客户端使用固定的块大小,将应用程序指定的文件名和字节偏移量转换为文件内的块索引。然后,它向 master 发送包含文件名和块索引的请求,master 回复相应的块句柄(handle)、副本的位置列表,客户端使用文件名和块索引作为键来缓存该信息(会有过期时间)。

然后,客户端选择其中一个副本发送请求(很可能是距离最近的副本),该请求指定块句柄和块内字节范围。在缓存信息过期或重新打开文件之前,客户端不需要和 master 进行交互。客户端可以在向 master 发送的一个请求中请求多个块,master 也可以在回复中包含请求块之后的多个块信息(利用空间局部性),从而减少客户端和 master 交互的次数。

写操作

在发生写操作时,需要保证多个副本之间的一致性。GFS 使用租约(lease)实现一致性(租约是按块授予的),持有租约的 chunkserver 被称为 primary,其他包含副本的 chunkserver 被称为 secondary。租约的超时时间为 60 秒,primary 可以根据需要续约,续约请求包含在定期的心跳消息中发送给 master,master 也可以在租约到期之前撤销租约(用于快照创建)。如果 master 和 primary 发生网络分区故障,master 可以在旧租约到期之后向另一个副本授予租约,从而避免脑裂。

写操作的流程如下:

  • 客户端首先向 master 询问 primary 和 secondary 的位置并将其缓存。如果 primary 不存在,则 master 任选一个包含副本的 chunkserver 授予租约。客户端会缓存位置信息,当 primary 不可达或者回复其租约过期时,客户端会重新联系 master。然后,客户端以流水线的方式将数据发送到 primary 和 secondary(存储在缓冲区中),并且等待它们的确认响应。
  • 客户端收到所有副本的确认之后,向 primary 发送写请求,该请求会使用之前发送的数据。primary 为其收到的来自多个客户端的请求分配序列号(单个客户端的请求肯定是同步的,在收到响应之前不会发送第二个请求,因为 GFS 没法保证客户端请求的 FIFO 顺序),只有当 primary 本地执行成功之后,才会转发给所有 secondary,请求在所有副本上都按照序列号的顺序执行。当 primary 收到所有 secondary 的完成响应时,primary 回复客户端完成。
  • 论文提到,副本的任何错误都会被报告给客户端,客户端会首先从步骤三开始重试几次,然后从步骤一开始重试。论文有一个前后矛盾的点,首先提到 primary 分配序列号然后应用到本地,之后又说如果在 primary 执行失败就不会分配序列号和转发。不过无关紧要。

一致性模型

GFS 具有宽松的一致性模型,数据突变之后文件区域的状态,取决于突变的类型(随机写和追加写)以及是否存在并发,如下图所示。如果无论客户端从哪个副本读取,始终看到相同的数据,则文件区域是一致的。如果文件区域一致,且客户端将会看到整个突变的内容,则文件区域是定义的。PS:这里的一致并不是严格一致性(可线性化)。

随机写:顺序突变成功状态是定义的和一致的;并发突变成功状态是一致的但是未定义,因为 GFS 使用租约保证突变以相同的顺序应用到所有副本(一致的),但是突变没有对文件区域加锁(未定义)。

追加写:不论是顺序还是并发突变成功,状态都是定义的,因为追加是原子操作且按照指定顺序应用到所有副本。只要有一个副本追加失败,客户端会重试整个追加操作,使得在未失败的机器上多次追加相同的数据,在失败的机器上填充无效的数据(因为追加会指定偏移量),从而产生不一致。特别的,追加的偏移量由 primary 指定,而不是单纯的追加到文件末尾,这可以保证多个副本在成功执行追加的偏移位置的数据是一致的,即使之前发生过失败。也就是说,中间区域不一致,最后区域一致。

此外,失败的突变总是会导致不一致。即使突变成功,数据最终一致,客户端仍可能观察到不一致,因为数据可以从任何包含副本的 chunkserver 读取。假设突变由客户端 A 发起,首先应用到 primary,再发送到 secondary。客户端 B 和 C 在突变过程中分别读取 primary 和 secondary,可以观察到不一致的情况。如果突变很慢,由于网络延迟或故障重试,单个客户端也可以观察到不一致。应用程序需要自行适应 GFS 的宽松一致性模型。

版本检测

如果 chunkserver 发生故障,从而错过突变,其上的块副本将会过时。master 和 chunkserver 会为块维护一个版本号,以此来区分新副本和旧副本。当 master 授予某个块租约时,它会递增该块的版本号,同时和 chunkserver 通信来递增最新副本的版本号。任何包含旧副本的 chunkserver 都不会返回给客户端,master 会在心跳检测中检查副本的版本,然后 master 会指示 chunkserver 对旧副本垃圾收集。作为额外的保护措施,master 会在回复客户端请求或指示 chunkserver 复制时包含版本号,客户端和 chunkserver 会在执行操作时进行验证。

故障处理

master 故障

当 master 发生故障时,GFS 的外部监控基础设施会使用检查点和日志快速恢复,即使磁盘发生故障也可以在其他机器上使用检查点和日志的副本进行恢复。客户端仅使用 DNS 别名访问 master(CNAME),其映射可以随时更改,以适应 master 机器的变更。此外,GFS 使用 shadow master 在 master 故障时提供对文件系统的只读访问,shadow master 略微滞后于 master。由于它存储的是元数据,文件数据存储在 chunkserver 中,所以客户端实际上不会读取到旧数据。

PS:论文提到 shadow master 会按顺序执行操作日志的副本,似乎 master 和 shadow master 的关系就是主从复制,但是为什么 shadow master 只提供只读访问,即使在 master 故障之后。它和 master 应该是最终一致的,为什么论文没有说 shadow master 在执行完所有日志之后会被提升为 master,虽然可以肯定需要人工操作,以避免原 master 只是网络分区故障,从而导致脑裂。

chunkserver 故障

master 使用心跳消息判断 chunkserver 是否存活,如果 master 判断 chunkserver 故障,它会指示其他 chunkserver 对不满足复制因子的数据块重新复制。当故障的 chunkserver 上线时,master 也会删除多余的副本,以及因错过突变而过时的副本。

其他功能

使用写时复制实现快照功能;跨机架的块放置策略;根据服务器的负载进行块放置和块迁移;根据优先级重新复制块以恢复冗余;延迟删除和垃圾收集;使用校验和检测数据块是否损坏;生成诊断日志(包含 chunkserver 的启动/关闭,RPC 的请求/回复记录,不包括文件数据)。

问题

Q:租约具体是如何工作的,master 和 chunkserver 如何判断租约是否过期?如何撤销租约?

A:论文没有说明,我猜测:master 首先发送授予租约的请求,chunkserver 收到请求之后开始计时,同时发送响应给 master,master 收到响应之后开始计时。这样 chunkserver 和 master 都可以判断租约是否到期,以及 master 的计时总是晚于 chunkserver,可以保证避免脑裂。撤销租约只修改 master 本地元数据肯定是不行的,因为客户端可能正在和持有租约的 chunkserver 通信,master 必须直接向 primary 发送撤销请求。

Q:根据上述猜测,如果授予租约的响应丢失,master 该如何处理?是否需要考虑时钟同步问题?

A:如果 master 没有收到响应,它就不会把该 chunkserver 记作 primary 返回给客户端,那么该租约实际上是一个无效租约,master 可以重试或者另选一个包含副本的 chunkserver。计算超时使用的是单调时钟,不需要同步。也可以使用时间戳判断租约是否到期,但是使用的是墙上时钟,需要服务器之间时钟同步。

总结

GFS 使用复制进行容错,从而引入多个副本间的一致性问题。但是仅保证宽松的一致性,而将问题交由应用程序处理,该设计基于其特殊的使用场景,顺序读写以及用于批处理,似乎强一致性显得不是很重要。可以查看课程的笔记和问答加深对论文的理解,笔记结尾的优缺点总结还是很不错的,有一个全局的视角,我有点过于关注一致性了。更多关于 GFS 的讨论,可以阅读 GFS: Evolution on Fast-forward

Threads and RPC

参考 Go FAQcrawler.gokv.gonote

Exercise: Web Crawler

串行爬虫

就是 DFS。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
if fetched[url] {
return
}
fetched[url] = true
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
for _, u := range urls {
Serial(u, fetcher, fetched)
}
return
}

并行爬虫

共享 + 锁

在 DFS 的基础上,同时启动多个线程来爬取网页,通过使用锁保护共享变量,使用计数器等待所有线程执行完成。如果网页很多,可能会创建非常多的线程,可以通过使用线程池限制线程的数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
type fetchState struct {
mu sync.Mutex
fetched map[string]bool
}

func (fs *fetchState) testAndSet(url string) bool {
fs.mu.Lock()
defer fs.mu.Unlock()
r := fs.fetched[url]
fs.fetched[url] = true
return r
}

func ConcurrentMutex(url string, fetcher Fetcher, fs *fetchState) {
if fs.testAndSet(url) {
return
}
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
var done sync.WaitGroup
for _, u := range urls {
done.Add(1)
go func(u string) {
defer done.Done()
ConcurrentMutex(u, fetcher, fs)
}(u)
}
done.Wait()
return
}

func makeState() *fetchState {
return &fetchState{fetched: make(map[string]bool)}
}

通道(channel)

使用 channel 实现同步,不需要 DFS,没有任何共享变量,自然也不需要使用锁(尽管 channel 内部会使用锁)。代码的组织方式有点像 MapReduce。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func worker(url string, ch chan []string, fetcher Fetcher) {
urls, err := fetcher.Fetch(url)
if err != nil {
ch <- []string{}
} else {
ch <- urls
}
}

func coordinator(ch chan []string, fetcher Fetcher) {
n := 1
fetched := make(map[string]bool)
for urls := range ch {
for _, u := range urls {
if fetched[u] == false {
fetched[u] = true
n += 1
go worker(u, ch, fetcher)
}
}
n -= 1
if n == 0 {
break
}
}
}

func ConcurrentChannel(url string, fetcher Fetcher) {
ch := make(chan []string)
go func() {
ch <- []string{url}
}()
coordinator(ch, fetcher)
}

Go RPC

远程过程调用(Remote procedure call,RPC),指程序调用的过程在远程计算机执行。通常表现为客户端向服务器发送请求,服务器向客户端返回响应。数据在通过网络传输时需要进行序列化和反序列化,序列化是将数据转换为可以存储或传输的格式的过程,反序列化是序列化的逆过程。远程过程调用和本地过程调用的区别在于,如何处理失败的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
//
// Client
//

func connect() *rpc.Client {
client, err := rpc.Dial("tcp", ":1234")
if err != nil {
log.Fatal("dialing:", err)
}
return client
}

func get(key string) string {
client := connect()
args := GetArgs{"subject"}
reply := GetReply{}
err := client.Call("KV.Get", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
return reply.Value
}

func put(key string, val string) {
client := connect()
args := PutArgs{"subject", "6.824"}
reply := PutReply{}
err := client.Call("KV.Put", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
}

//
// Server
//

type KV struct {
mu sync.Mutex
data map[string]string
}

func server() {
kv := &KV{data: map[string]string{}}
rpcs := rpc.NewServer()
rpcs.Register(kv)
l, e := net.Listen("tcp", ":1234")
if e != nil {
log.Fatal("listen error:", e)
}
go func() {
for {
conn, err := l.Accept()
if err == nil {
go rpcs.ServeConn(conn)
} else {
break
}
}
l.Close()
}()
}

func (kv *KV) Get(args *GetArgs, reply *GetReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()

reply.Value = kv.data[args.Key]

return nil
}

func (kv *KV) Put(args *PutArgs, reply *PutReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()

kv.data[args.Key] = args.Value

return nil
}

其他问题

Q:为什么闭包函数可以使用外部函数的变量?

A:闭包函数引用的外部变量存储在堆中,所以当外部函数返回时,变量依旧存在,垃圾收集器会根据引用计数判断是否回收该变量。注意,如果需要引用循环中的变量,需要通过参数传递(值传递),因为外部变量会随循环变化。

Q:假设有两个线程,线程 A 使用条件变量等待某个条件,线程 B 在达到条件时通知线程 A。如果线程 B 在解锁之前通知线程 A 会有问题吗?

A:如果在解锁之前唤醒线程 A,假设线程 A 在 B 解锁之前被调度,那么线程 A 在获取锁时会被阻塞。之后的某个时刻,操作系统会调度线程 B 解锁,此时线程 A 可以重新被调度。注意,条件变量和锁的阻塞队列是独立的,所以不需要再次通知线程 A。