The Google File System

阅读论文 GFS,参考 FAQnote

概念

GFS 是一个分布式文件系统,用于大型分布式数据密集型应用程序,例如 MapReduce。系统的设计基于以下场景:

  • 系统由许多机器组成,所以会频繁发生故障。
  • GB 级别的文件很多,普通文件系统会将文件划分为很多块,不便管理。
  • 读负载由大量顺序读和少量随机读组成,写负载由大量追加写和少量随机写组成。
  • 高持续带宽比低延迟更重要,应用程序需要快速进行批处理,而对响应时间没有严格要求。

实现

GFS 集群由单个 master 和多个 chunkserver 组成,由多个客户端访问。文件被划分为 64 MB 的块(chunk),每个块都是 chunkserver 中的一个文件。在分配空间时使用懒分配策略,避免产生内部碎片从而浪费空间。master 在创建块时,会为其分配一个不可变且全局唯一的 64 位块句柄(chunk handle)。每个块都会被复制到多个 chunkserver 上(默认三个),以保证系统的可靠性。

master 主要负责维护系统的元数据、存储日志和检查点、租约管理以及控制垃圾收集、重新复制、负载均衡和快照创建。元数据包括命名空间、访问控制信息、从文件名到块句柄数组的映射,以及为每个块维护版本号和副本所在 chunkserver 的列表。master 通过心跳消息定期与 chunkserver 通信,向其发出指令和收集其状态。chunkserver 主要负责存储文件数据、版本号和校验和,64 MB 的 chunk 被划分为 64 KB 的 block,每个 block 都有一个 32 位的校验和。

链接到应用程序的 GFS 客户端代码实现文件系统 API,通过与 master 和 chunkserver 通信来代表应用程序读写数据。客户端从 master 获取元数据,从 chunkserver 获取文件数据。客户端不需要缓存文件数据(但会缓存元数据),因为负载通常是顺序读和追加写。客户端没有实现 POSIX API,因此不需要挂钩到 Linux vnode 层。PS:我没有查到 Linux vnode 相关的资料,vnode 似乎是 BSD 中的概念,和 VFS 有关。

块大小

Linux 文件系统的默认块大小为 4 KB,GFS 使用 64 MB 的块大小是基于其 GB 级文件场景而设计的,优势如下:

  • 由于应用程序通常是顺序读写文件数据,所以 64 MB 的块大小可以减少客户端和 master 的交互次数。
  • 许多操作会发生在同一个块中,使得客户端和 chunkserver 保持长 TCP 连接,有利于减少网络开销。
  • master 中的元数据更少,从而可以全部放入内存,避免磁盘 I/O。

同时,论文提到小文件可能只有一个块,面对多个应用程序的访问,存储该块的 chunkserver 有可能成为热点。GFS 的解决方案是使用更高的复制因子存储小文件,同时使批处理队列系统错开应用程序访问小文件的时间。潜在的解决方案是允许客户端在该情况下从其他客户端读取数据。

元数据

命名空间和映射会以操作日志(operation log)的形式持久化到 master 的本地磁盘和复制到远程机器,保证 master 能够在崩溃之后恢复。为保证一致性,只有在本地和远程将相应的日志刷新到磁盘之后,master 才会响应客户端的操作。可以对刷新和复制批处理,从而减少开销。当日志超过一定大小时,会创建检查点(使用 B 树),从而避免崩溃恢复时重放所有日志。由于创建检查点比较耗时,master 会切换到新的日志文件,并在单独的线程中创建检查点,以避免在创建检查点时停止执行(写)操作。检查点同样也会被复制到远程机器。

master 不会持久化块的位置列表,而是在启动时以及 chunkserver 加入集群时,向 chunkserver 询问其包含的块。因为块是否存在于某个 chunkserver 是由该 chunkserver 决定的,所以在 master 中持久化该信息没有任何意义,反而会面临同步问题。PS:说明 master 内存中的位置列表可能因为 chunkserver 故障,从而产生不一致。

命名空间其实就是一个将目录名和文件名作为节点的树,通过使用读写锁保证写操作的正确性。具体来说,读/写操作会获取路径上所有祖先节点的读锁,以及目标节点的读/写锁。锁在层级间按照自顶向下的顺序获取,在层级内按照字典序获取,从而避免死锁。PS:这让我想到 B+ 树的蟹行协议只锁定会被修改的节点,大概是因为 B+ 树的数据和路径不像文件和路径那样具有很强的关联性。

读操作

客户端使用固定的块大小,将应用程序指定的文件名和字节偏移量转换为文件内的块索引。然后,它向 master 发送包含文件名和块索引的请求,master 回复相应的块句柄(handle)、副本的位置列表,客户端使用文件名和块索引作为键来缓存该信息(会有过期时间)。

然后,客户端选择其中一个副本发送请求(很可能是距离最近的副本),该请求指定块句柄和块内字节范围。在缓存信息过期或重新打开文件之前,客户端不需要和 master 进行交互。客户端可以在向 master 发送的一个请求中请求多个块,master 也可以在回复中包含请求块之后的多个块信息(利用空间局部性),从而减少客户端和 master 交互的次数。

写操作

在发生写操作时,需要保证多个副本之间的一致性。GFS 使用租约(lease)实现一致性(租约是按块授予的),持有租约的 chunkserver 被称为 primary,其他包含副本的 chunkserver 被称为 secondary。租约的超时时间为 60 秒,primary 可以根据需要续约,续约请求包含在定期的心跳消息中发送给 master,master 也可以在租约到期之前撤销租约(用于快照创建)。如果 master 和 primary 发生网络分区故障,master 可以在旧租约到期之后向另一个副本授予租约,从而避免脑裂。

写操作的流程如下:

  • 客户端首先向 master 询问 primary 和 secondary 的位置并将其缓存。如果 primary 不存在,则 master 任选一个包含副本的 chunkserver 授予租约。客户端会缓存位置信息,当 primary 不可达或者回复其租约过期时,客户端会重新联系 master。然后,客户端以流水线的方式将数据发送到 primary 和 secondary(存储在缓冲区中),并且等待它们的确认响应。
  • 客户端收到所有副本的确认之后,向 primary 发送写请求,该请求会使用之前发送的数据。primary 为其收到的来自多个客户端的请求分配序列号(单个客户端的请求肯定是同步的,在收到响应之前不会发送第二个请求,因为 GFS 没法保证客户端请求的 FIFO 顺序),只有当 primary 本地执行成功之后,才会转发给所有 secondary,请求在所有副本上都按照序列号的顺序执行。当 primary 收到所有 secondary 的完成响应时,primary 回复客户端完成。
  • 论文提到,副本的任何错误都会被报告给客户端,客户端会首先从步骤三开始重试几次,然后从步骤一开始重试。论文有一个前后矛盾的点,首先提到 primary 分配序列号然后应用到本地,之后又说如果在 primary 执行失败就不会分配序列号和转发。不过无关紧要。

一致性模型

GFS 具有宽松的一致性模型,数据突变之后文件区域的状态,取决于突变的类型(随机写和追加写)以及是否存在并发,如下图所示。如果无论客户端从哪个副本读取,始终看到相同的数据,则文件区域是一致的。如果文件区域一致,且客户端将会看到整个突变的内容,则文件区域是定义的。PS:这里的一致并不是严格一致性(可线性化)。

随机写:顺序突变成功状态是定义的和一致的;并发突变成功状态是一致的但是未定义,因为 GFS 使用租约保证突变以相同的顺序应用到所有副本(一致的),但是突变没有对文件区域加锁(未定义)。

追加写:不论是顺序还是并发突变成功,状态都是定义的,因为追加是原子操作且按照指定顺序应用到所有副本。只要有一个副本追加失败,客户端会重试整个追加操作,使得在未失败的机器上多次追加相同的数据,在失败的机器上填充无效的数据(因为追加会指定偏移量),从而产生不一致。特别的,追加的偏移量由 primary 指定,而不是单纯的追加到文件末尾,这可以保证多个副本在成功执行追加的偏移位置的数据是一致的,即使之前发生过失败。也就是说,中间区域不一致,最后区域一致。

此外,失败的突变总是会导致不一致。即使突变成功,数据最终一致,客户端仍可能观察到不一致,因为数据可以从任何包含副本的 chunkserver 读取。假设突变由客户端 A 发起,首先应用到 primary,再发送到 secondary。客户端 B 和 C 在突变过程中分别读取 primary 和 secondary,可以观察到不一致的情况。如果突变很慢,由于网络延迟或故障重试,单个客户端也可以观察到不一致。应用程序需要自行适应 GFS 的宽松一致性模型。

版本检测

如果 chunkserver 发生故障,从而错过突变,其上的块副本将会过时。master 和 chunkserver 会为块维护一个版本号,以此来区分新副本和旧副本。当 master 授予某个块租约时,它会递增该块的版本号,同时和 chunkserver 通信来递增最新副本的版本号。任何包含旧副本的 chunkserver 都不会返回给客户端,master 会在心跳检测中检查副本的版本,然后 master 会指示 chunkserver 对旧副本垃圾收集。作为额外的保护措施,master 会在回复客户端请求或指示 chunkserver 复制时包含版本号,客户端和 chunkserver 会在执行操作时进行验证。

故障处理

master 故障

当 master 发生故障时,GFS 的外部监控基础设施会使用检查点和日志快速恢复,即使磁盘发生故障也可以在其他机器上使用检查点和日志的副本进行恢复。客户端仅使用 DNS 别名访问 master(CNAME),其映射可以随时更改,以适应 master 机器的变更。此外,GFS 使用 shadow master 在 master 故障时提供对文件系统的只读访问,shadow master 略微滞后于 master。由于它存储的是元数据,文件数据存储在 chunkserver 中,所以客户端实际上不会读取到旧数据。

PS:论文提到 shadow master 会按顺序执行操作日志的副本,似乎 master 和 shadow master 的关系就是主从复制,但是为什么 shadow master 只提供只读访问,即使在 master 故障之后。它和 master 应该是最终一致的,为什么论文没有说 shadow master 在执行完所有日志之后会被提升为 master,虽然可以肯定需要人工操作,以避免原 master 只是网络分区故障,从而导致脑裂。

chunkserver 故障

master 使用心跳消息判断 chunkserver 是否存活,如果 master 判断 chunkserver 故障,它会指示其他 chunkserver 对不满足复制因子的数据块重新复制。当故障的 chunkserver 上线时,master 也会删除多余的副本,以及因错过突变而过时的副本。

其他功能

使用写时复制实现快照功能;跨机架的块放置策略;根据服务器的负载进行块放置和块迁移;根据优先级重新复制块以恢复冗余;延迟删除和垃圾收集;使用校验和检测数据块是否损坏;生成诊断日志(包含 chunkserver 的启动/关闭,RPC 的请求/回复记录,不包括文件数据)。

问题

Q:租约具体是如何工作的,master 和 chunkserver 如何判断租约是否过期?如何撤销租约?

A:论文没有说明,我猜测:master 首先发送授予租约的请求,chunkserver 收到请求之后开始计时,同时发送响应给 master,master 收到响应之后开始计时。这样 chunkserver 和 master 都可以判断租约是否到期,以及 master 的计时总是晚于 chunkserver,可以保证避免脑裂。撤销租约只修改 master 本地元数据肯定是不行的,因为客户端可能正在和持有租约的 chunkserver 通信,master 必须直接向 primary 发送撤销请求。

Q:根据上述猜测,如果授予租约的响应丢失,master 该如何处理?是否需要考虑时钟同步问题?

A:如果 master 没有收到响应,它就不会把该 chunkserver 记作 primary 返回给客户端,那么该租约实际上是一个无效租约,master 可以重试或者另选一个包含副本的 chunkserver。计算超时使用的是单调时钟,不需要同步。也可以使用时间戳判断租约是否到期,但是使用的是墙上时钟,需要服务器之间时钟同步。

总结

GFS 使用复制进行容错,从而引入多个副本间的一致性问题。但是仅保证宽松的一致性,而将问题交由应用程序处理,该设计基于其特殊的使用场景,顺序读写以及用于批处理,似乎强一致性显得不是很重要。可以查看课程的笔记和问答加深对论文的理解,笔记结尾的优缺点总结还是很不错的,有一个全局的视角,我有点过于关注一致性了。更多关于 GFS 的讨论,可以阅读 GFS: Evolution on Fast-forward

Threads and RPC

参考 Go FAQcrawler.gokv.gonote

Exercise: Web Crawler

串行爬虫

就是 DFS。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
if fetched[url] {
return
}
fetched[url] = true
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
for _, u := range urls {
Serial(u, fetcher, fetched)
}
return
}

并行爬虫

共享 + 锁

在 DFS 的基础上,同时启动多个线程来爬取网页,通过使用锁保护共享变量,使用计数器等待所有线程执行完成。如果网页很多,可能会创建非常多的线程,可以通过使用线程池限制线程的数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
type fetchState struct {
mu sync.Mutex
fetched map[string]bool
}

func (fs *fetchState) testAndSet(url string) bool {
fs.mu.Lock()
defer fs.mu.Unlock()
r := fs.fetched[url]
fs.fetched[url] = true
return r
}

func ConcurrentMutex(url string, fetcher Fetcher, fs *fetchState) {
if fs.testAndSet(url) {
return
}
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
var done sync.WaitGroup
for _, u := range urls {
done.Add(1)
go func(u string) {
defer done.Done()
ConcurrentMutex(u, fetcher, fs)
}(u)
}
done.Wait()
return
}

func makeState() *fetchState {
return &fetchState{fetched: make(map[string]bool)}
}

通道(channel)

使用 channel 实现同步,不需要 DFS,没有任何共享变量,自然也不需要使用锁(尽管 channel 内部会使用锁)。代码的组织方式有点像 MapReduce。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func worker(url string, ch chan []string, fetcher Fetcher) {
urls, err := fetcher.Fetch(url)
if err != nil {
ch <- []string{}
} else {
ch <- urls
}
}

func coordinator(ch chan []string, fetcher Fetcher) {
n := 1
fetched := make(map[string]bool)
for urls := range ch {
for _, u := range urls {
if fetched[u] == false {
fetched[u] = true
n += 1
go worker(u, ch, fetcher)
}
}
n -= 1
if n == 0 {
break
}
}
}

func ConcurrentChannel(url string, fetcher Fetcher) {
ch := make(chan []string)
go func() {
ch <- []string{url}
}()
coordinator(ch, fetcher)
}

Go RPC

远程过程调用(Remote procedure call,RPC),指程序调用的过程在远程计算机执行。通常表现为客户端向服务器发送请求,服务器向客户端返回响应。数据在通过网络传输时需要进行序列化和反序列化,序列化是将数据转换为可以存储或传输的格式的过程,反序列化是序列化的逆过程。远程过程调用和本地过程调用的区别在于,如何处理失败的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
//
// Client
//

func connect() *rpc.Client {
client, err := rpc.Dial("tcp", ":1234")
if err != nil {
log.Fatal("dialing:", err)
}
return client
}

func get(key string) string {
client := connect()
args := GetArgs{"subject"}
reply := GetReply{}
err := client.Call("KV.Get", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
return reply.Value
}

func put(key string, val string) {
client := connect()
args := PutArgs{"subject", "6.824"}
reply := PutReply{}
err := client.Call("KV.Put", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
}

//
// Server
//

type KV struct {
mu sync.Mutex
data map[string]string
}

func server() {
kv := &KV{data: map[string]string{}}
rpcs := rpc.NewServer()
rpcs.Register(kv)
l, e := net.Listen("tcp", ":1234")
if e != nil {
log.Fatal("listen error:", e)
}
go func() {
for {
conn, err := l.Accept()
if err == nil {
go rpcs.ServeConn(conn)
} else {
break
}
}
l.Close()
}()
}

func (kv *KV) Get(args *GetArgs, reply *GetReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()

reply.Value = kv.data[args.Key]

return nil
}

func (kv *KV) Put(args *PutArgs, reply *PutReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()

kv.data[args.Key] = args.Value

return nil
}

其他问题

Q:为什么闭包函数可以使用外部函数的变量?

A:闭包函数引用的外部变量存储在堆中,所以当外部函数返回时,变量依旧存在,垃圾收集器会根据引用计数判断是否回收该变量。注意,如果需要引用循环中的变量,需要通过参数传递(值传递),因为外部变量会随循环变化。

Q:假设有两个线程,线程 A 使用条件变量等待某个条件,线程 B 在达到条件时通知线程 A。如果线程 B 在解锁之前通知线程 A 会有问题吗?

A:如果在解锁之前唤醒线程 A,假设线程 A 在 B 解锁之前被调度,那么线程 A 在获取锁时会被阻塞。之后的某个时刻,操作系统会调度线程 B 解锁,此时线程 A 可以重新被调度。注意,条件变量和锁的阻塞队列是独立的,所以不需要再次通知线程 A。

MapReduce: Simplified Data Processing on Large Clusters

阅读论文 MapReduce,做个总结,部分内容参考课程的笔记

概念

MapReduce 是一个编程模型,用于处理和生成大型数据集。它将并行、容错、数据分布和负载均衡的复杂性隐藏在库中,使得没有任何并行和分布式系统经验的程序员可以轻松利用大型分布式系统的资源。

MapReduce 将计算(computation)抽象为 \(Map\) 和 \(Reduce\) 函数。通常,\(Map\) 函数处理一个键值对,生成一组中间键值对。然后 MapReduce 库将具有相同中间键的值存储在一个列表中,以迭代器的方式提供给 \(Reduce\) 函数,避免因为数据量太大从而溢出到磁盘。\(Reduce\) 函数处理一个中间键以及该键的值列表,然后将列表中的值合并。

以下是可以被抽象为 MapReduce 计算的问题:统计文档中单词的出现次数,Grep(模式匹配),倒排索引,排序等。

实现

MapReduce 的实现方式取决于使用场景,Google 的常见计算场景是通过交换式以太网连接的大型计算机集群,使用 GFS 管理存储在各个机器的磁盘上的数据。用户将作业(job)提交给调度系统,每个作业由多个任务(task)组成,通过调度系统映射到可用的机器上。有关场景的详细信息见论文,下面介绍基于该场景的 MapReduce 实现。

概述

用户程序在调用 \(MapReduce\) 函数之后,用户程序中的 MapReduce 库首先将输入文件划分为 \(M\) 块,通常每块 16-64 MB。然后将用户程序复制到集群的多个机器上运行,由一个 master 和多个 worker 组成,master 会将一个 map 任务或 reduce 任务分配给空闲的 worker。

GFS 将所有文件划分为 64 MB 的块,并且将每个块的多个副本存储在不同的机器上。MapReduce 的 master 将 map 任务分配给包含输入文件的机器,从而可以避免通过网络传输输入数据(节省网络带宽)。

map worker 从 GFS 中读取文件,解析键值对并将其传递给用户的 \(Map\) 函数。\(Map\) 函数生成的中间键值对会被分区函数划分为 \(R\) 个文件(每个 reduce 任务一个文件),存储在机器的本地磁盘。\(R\) 个中间文件的位置和大小信息会被发送给 master,然后 master 负责将这些信息发送给 reduce worker。

reduce worker 收到文件的位置和大小信息之后,使用远程过程调用(RPC)从 map worker 的磁盘读取文件。当 reduce worker 读取完 \(M\) 个中间文件时,它会将数据按照中间键排序,以使相同键的键值对彼此相邻。reduce worker 迭代已排序的数据,它将唯一键以及该键对应的值列表传递给用户的 \(Reduce\) 函数。\(Reduce\) 函数合并数据,将结果写入 GFS 中的一个文件(每个 reduce 任务一个文件)。PS:GFS 会将输出文件以多个副本块的形式存储在多个机器上。

当所有 map 和 reduce 任务完成之后,master 唤醒用户程序。此时,在用户程序中的 \(MapReduce\) 函数调用返回至用户代码。通常,用户不需要将这些文件合并,而是将这些文件作为另一个 MapReduce 作业的输入。

细节

Master 数据结构

对于每个 map 和 reduce 任务,master 会记录任务的状态(idle,in-progress,completed),以及非 idle 任务所在的 worker 的标识。对于每个 map 任务,master 会记录 \(R\) 个中间文件的位置和大小信息,总共记录 \(M\times R\) 个信息。

容错

worker 失效

master 定期对每个 worker 执行 ping 操作,如果超时未响应则将其标记为失效。失效 worker 已完成的任何 map 任务、正在执行的 map 和 reduce 任务都会被重置为 idle 状态,从而可以分配给其他 worker。

已完成的 map 任务在 map worker 发生故障时需要重新执行,因为它们的输出存储在失效机器的本地磁盘上,无法被访问。而已完成的 reduce 任务将输出存储在 GFS 中,所以在发生故障时不需要重新执行。

当失效的 map worker A 被 map worker B 替代时,将会通知所有 reduce worker 重新执行,任何没有从 A 读取数据的 reduce 任务将会从 B 读取数据。PS:已从 A 读取数据的 reduce 任务该如何处理,论文没有说明。我的想法是,如果已读取完,则不会有问题。如果读取到一半,那么直接丢弃即可。

master 失效

定期持久化 master 数据结构的检查点(checkpoint),如果 master 失效,则可以从上一个检查点状态重新开始。论文中表示单个 master 失效的可能性不大,所以发生故障时会中止 MapReduce 计算。

PS:检查点和失效点之间的状态会丢失,有可能已完成的 map 任务没有被 master 持久记录已完成。如果持久状态是 idle,则会导致重新分配 map 任务。如果持久状态是 in-progress,则需要等待 worker 发送完成信息,但 worker 如何知道需要重新发送完成信息,这需要额外的机制。

任务粒度

一个 map 任务处理一个文件,生成 \(R\) 个中间文件,分别由 \(R\) 个 reduce 任务处理。理论上,\(M\) 和 \(R\) 的大小应该远大于 worker 的数量,从而分配更多的任务给较快的机器,以均衡负载。如果 worker 发生故障,还可以通过将任务分散到多个 worker 上来提升故障恢复的速度。

备份任务

由于 MapReduce 操作需要等待所有任务完成才能够继续推进,所以少数缓慢的机器(straggler)会拖慢整个操作。当 MapReduce 操作接近完成时,master 可以冗余执行剩余的任务,以缩短 MapReduce 的执行时间。

问题

Q:如果网络延迟导致 master 将 worker 标记为失效,master 如何处理旧 map worker 发送的中间文件信息?GFS 如何处理重复 reduce 任务的输出?

A:对于 map 任务,论文提到 master 会忽略 map 任务的重复完成消息,但是它是否会接收旧 worker 的消息,毕竟已经分配新 worker 重新执行该 map 任务。不论如何,只要保证 reduce 任务只从其中一个中间文件读取数据,就没有问题。对于 reduce 任务,输出首先被写入临时文件,当任务完成时再重命名为最终文件,GFS 提供原子重命名(使用锁),从而相同 reduce 任务只会有一个输出文件。

总结

论文在介绍完实现之后,还提出对实现的改进、测试实现在 Grep 和 Sort 场景的性能等。MapReduce 的成功主要在于,隐藏各种细节使得 MapReduce 易于使用,很多问题都可以表示为 MapReduce 计算,而且作者开发的 MapReduce 实现可以有效利用大型计算机集群的资源。从中可以学到:限制编程模型可以使并行和分布式计算更容易,使计算具有容错性;网络带宽是稀缺资源,使用局部性优化可以节省网络带宽;冗余执行可以减少缓慢机器的影响。