Lab 3: Raft

课程网站实验网站实验指南

规则

在此列出比较通用且重要的规则:① 发送心跳 RPC 每秒不会超过 10 次。② 如果多数服务器之间可以通信的话,应该在旧领导者失效之后的 5 秒内选出新的领导者。③ election timeout 的范围应该比论文中提到的 \([150,300]\) 毫秒更大,因为实验要求的心跳消息的发送频率比论文中的更低。当然也不能太大,否则无法满足第 ② 点要求。④ 在循环中使用 rf.killed 函数,避免被测试程序 kill 的 peer 持续运行。

⑤ 每个 RPC 应该在单独的线程中发送和处理响应,因此它们不会相互阻塞,也不会阻塞当前函数,例如心跳和选举。而 RPC 的参数应该在启动线程时复制,因为 peer 的状态随时都可能变化,我们要保证 RPC 是以当前状态发送的。⑥ 发送 RPC 的是 labrpc.go 中的 Call 函数,如果返回 false,则表示没有收到响应。此时,只需要重试复制日志相关的 RPC,而不需要重试心跳和投票相关的 RPC。不重试时,记得直接退出函数,不要继续处理默认的“响应”。

⑦ 在收到 RPC 请求或响应时,比较任期大小,来确定是否需要将当前 peer 的 state 转变为 follower。⑧ 处理响应时,要首先检查所有假设,也就是此时 peer 的某些状态是否和发送请求时相同。如果不一致,则说明响应已经不适用。特别的,应该总是执行第 ⑦ 点。此处需要仔细思考,应该比较哪些状态。

Part 3A: leader election

实现

实现领导者选举和心跳检测。首先按照论文的图 2 在 Raft 结构体中添加状态,然后在 Make 函数中初始化。特别的,我们需要添加额外的状态信息,包括当前 peer 的 state(follower,candidate,leader),以及最近收到的有效 RPC 的时间戳,该时间戳用于判断 follower 是否需要转变为 candidate。有效是指,收到当前 leader 发送的 AppendEntries RPC,或者收到某个 candidate 的 RequestVote RPC 且决定投票给它。当前 leader 的含义需要搞清楚,不仅是任期不旧,而且日志前缀要匹配,也就是返回 success = true 才行。

实现 AppendEntries RPC 和 RequestVote RPC 处理程序,除了按照论文图 2 描述的步骤编写代码以及满足第 ⑦ 点之外,有几个需要注意的细节。如果 RPC 有效,则更新时间戳。理解 up-to-date 的含义,在论文 5.4.1 节的结尾有描述。特别的,对于 AppendEntries RPC,如果当前 peer 的 state 是 candidate,则在当前任期和 RPC 的任期相等时,仍然需要将 state 转变为 follower。

此外,论文图 2 对 RequestVote RPC 实现的描述有遗漏,即使 votedFor 不是 nullcandidateId,只要 term > currentTerm,在满足 up-to-date 的前提下,候选者也可以获得选票。因为,voteFor 的作用只是限制,一个 peer 在一个任期内只能投一次票,如果任期都不相同,当然就不需要做任何限制。

实现心跳检测的函数,首先使用 for 循环 + rf.killed 函数。心跳是周期性的,我们需要等待某个时间,满足第 ① 点。该等待的过程应该放在循环的最后,因为当 candidate 转变成 leader 时,需要立即发送心跳 RPC,防止其他 peer 发起新的选举。在循环开始时,我们需要检测当前 peer 是否仍是 leader,因为它可能转变为 follower。在发送 AppendEntries RPC 和处理响应时,需要满足第 ⑤⑥⑦⑧ 点。

实现超时选举的函数,也就是 ticker 函数,首先依然使用 for 循环 + rf.killed 函数。然后,等待随机化的 election timeout,满足第 ③ 点。应该将等待的代码发在循环的开头,因为初始时所有 peer 都是 follower,放在开头有助于避免选票分裂。然后思考谁会发起选举,显然只有 follower 和 candidate 会。那么它们发起选举的条件分别是什么?在 election timeout 内,如果 follower 没有收到有效 RPC,或者 candidate 没有成为 leader。如果满足条件,则将当前 peer 的 state 转变为 candidate,然后执行论文图 2 中指定的操作。

在发送 RequestVote RPC 和处理响应时,需要满足第 ⑤⑥⑦⑧ 点。如何处理投票的结果?比较自然的想法是使用通道,因为整个过程有点类似生产者消费者模型。更简单的方法是,在 Raft 结构体中记录获取的选票数,这样就可以在发送 RPC 的线程中处理响应。不过,要记得在发起选举时,初始化该字段。当 candidate 转变为 leader 时,记得初始化和 leader 相关的字段,包括 nextIndexmatchIndex

测试

如何执行多次测试,只保留失败测试的输出?不知道。

使用命令 time go test -race -run 3A -count 1 -failfast -timeout 0 | tee log 开始测试:

① 报错 race: limit on 8128 simultaneously alive goroutines is exceeded, dying。不使用 -race,第二个测试一直停不下来,查看日志发现某个 peer 一直不响应。检查代码发现,在退出某个函数时,忘记释放锁,使得该 peer 无法执行任何操作。② 报错 config.go:443: term 2 has 2 (>1) leaders。日志表明,peer 没有获取多数选票仍然当选 Leader。检查代码发现两处错误,某个判断条件写反,以及发送 RPC 没有排除自身,导致自己给自己多投一票。③ 报错 config.go:454: expected one leader, got none。应该是 election timeout 设置太大,没有在规定时间内选举出 Leader。

Part 3B: log

实现

实现日志复制,以及将已提交日志应用到状态机。首先谈谈 AppendEntries RPC 的更多细节,一定要认真读论文图 2 描述的步骤。特别注意,只会删除冲突的日志,不冲突的日志会保留。以及不要搞错更新 commitIndex 的方式,注意是和 “index of last new entry” 取最小值,也就是 args.PrevLogIndex + len(args.Entries)

实现将已提交日志应用到状态机的过程,我们在 Make 函数中单独创建一个线程来处理此过程。我们需要添加一个状态 applyIndex,表示最后应用到状态机的索引位置,初始化为零。然后依然使用 for 循环 + rf.killed 函数,循环执行该过程。什么时候应用到状态机?显然是当前 peer 的 commitIndex 推进时,可以使用 sync.Cond 或者 time.Sleep 来实现等待操作。接下来就很简单,只需按照顺序将已提交日志对应的 ApplyMsg 发送到通道。

实现日志复制,首先实现 Start 函数。逻辑上比较简单,如果当前 peer 是 leader,则将命令包装成日志,将其添加到自身的 log 中。然后,向其他 peer 发送 AppendEntries RPC,需要满足第 ⑤⑥⑦⑧ 点。注意看论文图 2 的描述,发送 RPC 有个前提条件 lastLogIndex >= nextIndex,以及发送的是 nextIndex 对应的日志,该日志不一定是当前命令的日志。简单起见,发送从 nextIndex 开始,之后的所有日志,这样也可以提升 peer 之间同步的速度,测试的运行时间会更短。参数中不要直接使用切片,而要使用切片的副本,否则可能会产生竞态条件。

处理 AppendEntries RPC 的响应,按照论文图 2 的规则编写代码就行,但是有更多细节需要注意。如果返回 success = true,该如何更新 nextIndexmatchIndex?直接将 nextIndex 更新为 args.PrevLogIndex + 1 + len(args.Entries) 是错误的,因为 Start 函数会不断地接收客户端的命令并发送 RPC,会同时存在多个发送给同一个 peer 的 RPC,而且它们都可以返回 success = true,然后 nextIndex 就会被不断地设置为最后一个响应的值。但是,并发 RPC 的顺序会有交错,之后响应的 RPC 的 args.PrevLogIndex + 1 + len(args.Entries) 值可能更小,所以更新时取 max(rf.nextIndex[i], args.PrevLogIndex + 1 + len(args.Entries))。如果返回 success = false,记得重试 RPC。

最后,比较有疑问的点就是 leader 如何推进 commitIndex,论文图 2 说的很明白,但是如何使用代码去实现呢?可以二分 \(N\) 的值,或者对 matchIndex 降序排序,取第 n / 2 + 1 个元素判断是否满足推进的条件,更优的做法是使用快速选择算法。那么应该何时检查条件,尝试推进?我的想法是,在处理 AppendEntries RPC 的响应时,如果某个 peer 的 matchIndex 增加,则做一次检查。特别注意,对 leader 的 matchIndex 做特殊判断。

测试

使用命令 time go test -race -run 3B -count 1 -failfast -timeout 0 | tee log 开始测试:

① 报错 panic: runtime error: index out of range [-1]。代码已经在选出 leader 时,将 nextIndex 正确初始化。说明索引是在递减之后越界,也就是说 AppendEntries RPC 的响应可能有误,检查之后发现在 RPC 处理程序中忘记执行赋值 reply.Success = true。② 报错 config.go:594: one(100) failed to reach agreement。检查日志是正常的,只能在测试代码中添加打印语句,然后发现误将 ApplyMsgCommand 字段设置成日志项,而不是日志项中的命令。③ 依然报错 config.go:594: one(100) failed to reach agreement。查看日志发现,日志提交到多数 peer,但是 leader 没有推进 commitIndex。明显是在统计个数时出现问题,查看代码发现原因是没有对 leader 的 matchIndex 做特殊处理。

④ 报错 config.go:601: one(106) failed to reach agreement。定位错误有点慢,主要是一直傻傻看日志,明明只要再多打印一条信息更详细的日志,就会很容易发现问题。处理 AppendEntries RPC 时,在 Success = false 的情况下,忘记重试 RPC。论文图 2 有说到,“decrement nextIndex and retry”。注意,重试前要根据当前状态重新设置参数的值。⑤ 日志很奇怪,出现没有被执行的操作,报错也找不到错。定位错误很慢,错误的原因是忘记在循环中判断当前 peer 是否被 kill。其实可以想到的,日志混乱就是被 kill 的 peer 执行打印语句造成的,而报错也是因为被 kill 的 peer 仍在执行某个操作。

⑥ 报错 apply error: commit index=2 server=4 1000 != server=2 30Start 函数并发发送 RPC,顺序交错,没有正确更新 nextIndex。就是在上面提到的,需要取两者的最大值。⑦ 报错 testing.go:1152: race detected during execution of test。查错发现,切片是引用类型,作为参数传递到各个位置,引发竞态条件。在最开始,Go 竞争检测器报告的堆栈信息显示,是由 log.Printf 触发的竞争,我还以为是包有问题。之后,某次测试报出和切片相关的竞争,才发现应该是参数的问题。关于切片的线程安全问题,可以查看Reddit 中的讨论

Part 3C: persistence

实现

实现 nextIndex 递减策略的优化,以及持久化必要的状态。比较简单,没有任何问题,测试报错基本上是因为之前的代码有问题。

测试

使用命令 time go test -race -run 3C -count 1 -failfast -timeout 0 | tee log 开始测试:

① 报错:race: limit on 8128 simultaneously alive goroutines is exceeded, dying。定位错误很慢,之前也遇到过该错误,不过是通过其他方式解决的。该错误其实就是因为创建太多线程,检查代码发现原因是,我会在日志 RPC 超时的时候进行重试,而每个 RPC 都在单独的线程中发送,所以无法通信的 peer 会累积很多重试线程。解决方案就是,不在 RPC 失败时重试,而使用定时重试。定时重试的代码和心跳检测基本上是一致的,区别只有是否包含日志项,所以实现时放一起很合理,并且这也可以额外满足只在空闲时发送心跳消息的规则(刚想起这个规则)。

② 报错 config.go:601: one(5634) failed to reach agreement。日志很清晰,问题出在 leader 变更之后,没有调用 Start 函数,使得日志没有同步。此时,定时重试起不到作用,因为 nextIndex 被初始化为 lastLogIndex + 1,而重试的前提是 lastLogIndex >= nextIndex。最开始想到的解决方案是,在当选 leader 时执行一次 AppendEntries RPC,包含索引位置为 nextIndex - 1 的日志项。但是有更好的做法,之所以会报错可以理解为心跳 RPC 没有在冲突时递减 nextIndex,因为我以为不需要处理心跳返回的 success 值,如果按正常流程处理其响应,那么就不会出现这个错误。

Part 3D: log compaction

实现

使用快照实现日志压缩,首先需要明白整体的流程,阅读 Diagram of Raft interactions 会很有帮助。如何创建快照?快照是由状态机/应用层创建的,然后对每个 peer 调用 Snapshot 函数来传递快照,此时 peer 可以丢弃快照包含的日志。而且 peer 也会负责快照的持久化,我们可以修改 persist 函数来实现。值的注意的是,peer 并不负责读取快照,状态机/应用层会读取 peer 持久化的快照。

实现 Snapshot 函数,需要在 Raft 结构体中添加两个状态,日志偏移量和快照。如果快照包含直到 log[Index] 的日志信息,我们会丢弃 Index 之前的所有日志,但是不会丢弃 Index 位置的日志,以便 AppendEntries RPC 对日志做一致性检查。在丢弃日志时需要注意,不要使用切片引用,而是复制切片,否则日志无法进行垃圾回收。Java 中 substring 函数就是因为使用引用可能会保留太多无效数据,而无法进行垃圾回收,所以改为复制的实现方式。

以上两个状态都需要持久化,而且我们需要修改必要的代码,以支持日志偏移量。首先修改 applyIndex 的初始值为日志偏移量,然后修改所有和 log 的索引访问以及长度有关的代码。修改代码感觉侵入性很强,很多地方都需要加减日志偏移量,可读性差并且易错。我想到的解决方案是,将获取日志长度和日志项的代码包装在单独的函数内,其他地方只需要调用对应的函数,从而不会导致混乱。注意,有些代码可能需要特殊处理。

当 leader 不包含 nextIndex - 1 位置的日志时,会调用该 InstallSnapshot RPC 而不是 AppendEntries RPC。特别注意,即使有以上限制,依然需要修改 AppendEntries RPC 处理程序,使其能够正确处理发送的日志项已经被 follower 压缩为快照的情况,因为 RPC 是并发的并且有可能滞后。简单的处理方式是,如果日志项被包含在快照中,则返回 success = true。修改代码时要非常小心,不要执行不必要的操作,从而导致索引越界,或者其他不正确的行为。

实现 InstallSnapshot RPC 处理程序,需要满足第 ⑤⑥⑦⑧ 点。处理程序的框架可以参考 AppendEntries RPC 的实现,实现可以参考论文图 2 描述的步骤。当 follower 收到有效的快照时,需要向 applyCh 通道发送消息。在发送消息之前,需要修改 applyIndex = max(rf.applyIndex, rf.logOffset),之所以取 max 是因为有可能收到描述当前日志前缀的快照。最后,如果 RPC 返回响应且满足第 ⑧ 点,需要取最大值来更新 nextIndex

测试

使用命令 time go test -race -run 3D -count 1 -failfast -timeout 0 | tee log 开始测试:

① 测试无法停止或者卡住,日志表明 leader 在向 applyCh 发送消息时阻塞。我之前就想过 applyCh 有可能会阻塞,但是测试通过就以为它不会阻塞,但是果然通道还是会阻塞。一定记住,不要假设任何不存在的前提条件。② 报错 snapshot decode error。定位错误有点慢,添加打印语句发现参数的快照字段,在发送 RPC 之前有值,但在接收 RPC 时没有值。很明显是字段首字母没有大写,从而没有将字段导出。

③ 报错 apply error: server 0 apply out of order, expected index 1, got 10。很明显是应用层没有读取到快照,在我的代码中,该错误的触发条件是连续两次崩溃重启。原因是 peer 重启时没有读取快照,那么在之后的 persist 就会将已有的快照覆盖为 nil,然后第二次重启应用层没有读取到快照,从而产生错误。④ 索引越界,当 AppendEntries RPC 响应 success = false 时,有可能 reply.XIndex 位置的日志已经被丢弃,需要特判。

问题

在调试时发现,如果 leader 转变为 follower 之后,对应的 election timeout 正好结束,那么由于该 follower 没有设置最近收到的有效 RPC 的时间戳,所以会直接转变为 candidate,发起选举。我的想法是,可以在 leader 转变为 follower 时,重置超时时间。但是,要允许重置就需要在 Raft 结构体中额外添加状态。

在处理 RPC 响应之前,到底应该检查什么假设,一般会检查任期是否和发送时一致。但是,如何判断其他状态是否需要做一致性检查?处理 AppendEntries RPC 响应之前,如果 nextIndex 改变,只要任期没变,也依然可以处理该响应,因为这是正确的。为什么任期改变,就不能继续处理响应?因为当前 peer 可能已经不是 leader,或者虽然是 leader,但是 nextIndex 已经重新初始化,继续处理该响应会造成错误。

处理 RequestVote RPC 响应之前,如果 len(rf.log) 改变且任期没变,是否可以处理该响应?len(rf.log) 改变,说明当前 peer 收到有效的 AppendEntries RPC,它已经和 leader 取得联系转变为 follower,所以不需要处理该响应。类似的问题还可以提出很多,总之确定应该检查什么状态,来维持程序的正确性,需要仔细思考。

疑问:有可能收到当前快照的前缀快照吗?发送快照消息给 applyCh 时,解锁是否有可能产生错误?更新 nextIndex 时,由于 RPC 的并发性,总是需要取最大值来更新?之前处理 AppendEntries RPC 的响应时,并发返回 success = false,是否有可能不正确的递减 nextIndex,甚至使其为负数?

总结

实验有很多细节点,有时以为自己知道该怎么做,但在编写代码却没有完全按照准则说的做,往往会遗漏某个前提条件或者某个操作,以及理解错某个概念。在做每一步之前,最好问一下自己,是否应该这样做,有什么特殊情况。日志对调试的帮助很大,我还是第一次打印这么大量且详细的日志,来调试并发程序。代码中同时存在多个错误真的很难搞,有时修复一个错误之后仍然报错,那么有可能是修复时引入的错误,或者之前代码中遗留的错误。在测试的后半部分,有种面向测试查错的感觉,个人感觉不是很好,最好还是能自己顺着逻辑找出错误。如果隐藏测试代码,估计得花费更长时间查错。


创建 go-test-many.sh 脚本,使用命令 bash ./go-test-many.sh 1000 8 开始并行测试。测试 Test (3C): Figure 8 (unreliable) ... 失败一次,报错 config.go:601: one(9042) failed to reach agreement。日志表明,follower 没有在限定时间内和 leader 同步。由于我的代码在某些情况下,只会传递 nextIndex 位置的日志,而不是之后的所有日志,此时会有同步过慢的问题,简单修改一下就可以解决。

作者

Ligh0x74

发布于

2024-09-07

更新于

2024-09-14

许可协议

评论