Lab 5: Sharded Key/Value Service

课程网站实验网站实验指南

Part A: The Controller and Static Sharding

实现

基本上就是复制 lab4 的代码,实现平衡分片到服务器的代码会花点时间,像在做算法题。

测试

使用命令 time go test -race -run 5A -count 1 -failfast -timeout 0 开始测试:

① 测试 TestMulti,报错 test_test.go:61: Shards wrong。日志显示,不同状态机的 Shards 数组包含相同的元素,但是顺序不同。可以想到,错误原因在于改变状态机的代码是非确定性的,由于提示中有说 map 遍历顺序的不确定性,所以我们可以很快定位错误。要使遍历顺序具有确定性,只好将 map 中的键放在数组中,排序之后遍历。

Part B: Shard Movement

实现

假设副本组 G1 负责的分片 S1 需要移动到副本组 G2。在 G1 发送分片之后,以及 G2 接收分片之前,都不应该处理和该分片相关的客户端请求,不论是新收到的请求还是已经在 applyCh 中的请求。G1 应该返回 ErrWrongGroup,G2 应该使用条件变量等待分片的到达。和 lab4 相同,如果服务器发现其已经不是 leader,则不应该继续等待。而且,G2 必须将接收的分片使用 Raft 应用到状态机之后,才能返回响应。为了方便的发送和删除分片,服务器应该按分片来存储数据。如果服务器需要发送多个分片,可以简单地串行发送,更好的做法是,并发地向不同副本组发送分片,在单个 RPC 中包含所有需要发向该副本组的分片。

如果在 G2 更新配置之前,G1 发送的分片到达 G2,该如何处理?在请求中应该包含配置编号,如果 G2 发现更大的配置编号,则使用条件变量等待配置更新,再处理该请求。如果 G1 发生故障,G2 可能会收到重复的分片,在将分片应用到状态机之前,应该限制该分片所属的配置编号和当前配置相同,并且使用副本组编号和配置编号的组合来去重。如果分片的配置编号小于当前配置编号,则直接响应 OK

对于客户端请求,必须实现跨副本组的重复检测。例如,副本组 G1 已执行客户端的请求但未响应,然后 G1 向 G2 发送分片,最终客户端会向 G2 发送重复的请求。简单的解决方案是,G1 不仅发送分片,还发送用于重复检测的数据,所以重复检测的数据最好也是按照分片来存储。

服务器需要定期检查配置是否变更,在一个副本组中,只有 leader 会检查,还是所有副本都会检查?应该是只有 leader 检查配置变更,然后使用 Raft 将变更复制到其他副本中。因为不同副本检查配置变更的时机不同,服务器拒绝 applyCh 中相关请求的时机也就不同,从而状态机会不一致。在将配置变更应用到状态机之后,才开始发送分片,从而确保不同副本都会发送相同的分片。

在检查配置变更时,需要根据配置编号获取下一个配置,而不能直接获取最新配置,因为在未检查期间配置可能变更多次。如果 Raft 选举出新的 leader,它会检查下一个配置变更将其添加到日志,若此时通道中已经存在旧 leader 提交的一个/多个配置变更的日志,有可能会导致不正确的覆盖。所以,在将变更应用到状态机之前,应该总是限制配置编号是单调递增的。

配置必须线性地变更,如果配置变更多次,之后的变更必须等待之前的变更完成。可以将等待发送和接收的分片记录到状态中,然后使用条件变量检查状态来确定之前的配置是否完成。特别的,如果配置的编号为 1,则不需要记录状态。如果 leader 在变更配置的过程中崩溃,新的 leader 需要知道当前配置是否变更完成,所以上述状态应该使用 Raft 复制到副本中。同时,创建一个线程定期检查(或者使用条件变量),如果当前服务器是 leader,则发送状态中记录的待发送分片,发送完成之后删除对应分片。

测试

使用命令 time go test -race -run 5B -count 1 -failfast -timeout 0 开始测试:

① 在将配置应用到状态机之后,没有拒绝 applyCh 中的相关请求。 ② 在处理发送分片的 RPC 的响应时,忘记检查假设,需要确保当前配置编号和发送 RPC 时相同。 ③ 读取快照的语句放在初始化之前,导致错误的状态。 ④ Op 中的 map 会和 Rafte.Encode(rf.log) 竞争,所以总是使用 map 的副本。 ⑤ 在修改状态时,没有唤醒条件变量。 ⑥ 有些状态只有 leader 会修改,所以 follower 需要特殊处理。

⑦ 如果 leader 接收分片,使用 Raft 复制到多数副本,在返回响应之后所有副本都崩溃。重启之后,由于 Raft 对提交之前任期的日志条目存在限制,该日志无法提交。此时,客户端请求该分片的数据将会阻塞直到日志应用,而日志无法提交除非在当前任期提交一个日志。解决方案就是,在重启之后总是添加一个空日志条目,来推进 commitIndex。之前测试有发生过低概率的报错,我当时就怀疑是 Raft 的实现有问题。

总结

Part B 有点折磨,有些很蠢的错误,花费很长时间才定位到。例如 ⑥,主要是太相信某个部分不会出错,结果就是那个地方有问题。例如 ①,以为已经实现某个功能,实际上存在遗漏,在定位错误时就不会往那个方面想。原因在于,没有在完成部分功能时做相关的测试,从而将发现错误的时间延后,此时代码已经有很多其他逻辑,定位错误会更加困难。错误 ②③④⑤ 是细节上的遗漏,因为功能实现并不是线性的过程,添加某个功能会修改已经实现的部分,逻辑交互在一起难免会有遗漏,特别是在代码组织很糟糕的情况下。整体来说,程序的并发运行会产生意想不到的交错,特别是存在网络或者机器故障的情况下,真的很难在最开始就写出正确的代码。

Lab 4: Fault-tolerant Key/Value Service

课程网站实验网站实验指南

单个客户端只会串行发送 RPC,但服务器会收到多个客户端的 RPC。

Part A: Key/value service without snapshots

实现

实现 ClerkGetPutAppend 方法。如果不知道谁是 leader,则迭代地向每个服务器发送 RPC。直到某个服务器回复操作成功,此时记录该服务器为 leader,以免每次发送 RPC 都需要重新确定 leader。如果标记为 leader 的服务器回复操作失败或者超时未回复,依然需要迭代所有服务器。因为 leader 随时可能变化,甚至是在迭代的过程中,所以需要在迭代的外层添加一个无限循环。

实现 KVServerGetPutAppend 方法。基本的结构是,首先调用 rf.Start 方法,如果不是 leader 则响应 ErrWrongLeader。否则,在正常情况下,我们等待通道中出现对应的命令,将命令应用到状态机,然后返回响应。由于 rf.Start 方法不保证会提交指定的命令,我们需要一种能够检测当前服务器已经不是 leader 的方法,从而避免客户端阻塞在非 leader 服务器上。方式如下,可以定时调用 rf.GetState 检查当前服务器的任期是否和之前调用 Start 得到的任期不一致,或者在通道中出现和请求不匹配的消息。

对于每个服务器,我们应该使用一个线程来读取 ApplyMsg,不断地将已提交的命令应用到状态机。我们不能等待 RPC 去读取对应的 ApplyMsg,因为只有 leader 会收到 Clerk 的 RPC,此时其他服务器需要自动同步。需要注意,不是所有提交的命令都会应用到状态机,因为会存在重复的请求。例如,当前 leader 在提交日志之后崩溃,客户端没有收到响应,然后向新的 leader 发送重复的请求,如果此时命令没有被应用到状态机,依然会调用 rf.Start 方法,重复的请求会出现在日志中。

我们需要为每个客户端记录其最后应用到状态机的命令对应请求的标识,来过滤重复的请求。特别的,Get 请求不会对状态机产生影响,并且将 Get 请求放入日志只是为保证单个客户端视图的单调性,所以我们不需要过滤重复的 Get 请求,也就不需要保存上次 Get 请求的结果,而总是应该返回最新的结果。对于每个 Clerk,只缓存上次请求的 RequestId 是否有可能产生错误?例如,某个很早之前发送的 RPC 到达 KVServer,而对应的 RequestId 已经被覆盖。不会发生该情况,TCP 会保证发送顺序和到达顺序一致。

测试

使用命令 time go test -race -run 4A -count 1 -failfast -timeout 0 开始测试:

① 测试无法停止,查看日志发现,客户端请求阻塞在非 leader 服务器上。通过定时检查服务器的任期,可以解决该问题。

Part B: Key/value service with snapshots

实现

什么时候检查大小?可以定时检查。如果超过大小,该执行什么操作?调用 rf.Snapshot。什么时候读取快照?在服务器启动时,以及从通道中。特别的,如果 maxraftstate = -1,则不需要快照。通道中的快照是否有可能比服务器的当前状态更小?

测试

使用命令 time go test -race -run 4B -count 1 -failfast -timeout 0 开始测试:

① 报错 panic: runtime error: slice bounds out of range [-190:]。Raft 发送快照给服务器,而服务器也发送快照给 Raft,状态交叠从而产生错误。要解决该问题,第一个想法是在 Raft 向服务器发送快照时不释放锁,但是有可能产生死锁。第二个想法是,在服务器从通道接收快照消息之后,再更新 Raft 的状态。但是还是不行,如果服务器读取快照消息却没有应该到状态机,然后服务器在 Raft 更新状态之前,调用 persister.RaftStateSize 检查大小,此时 Raft 更新状态,之后服务器调用 rf.Snapshot 方法传递一个更小的快照,必然会产生索引越界。或者,服务器调用 rf.Snapshot 方法传递一个更大的快照,然后 Raft 更新状态,也会索引越界。而且,在之后更新状态必须重新检查假设,很麻烦。简单起见,依然在解锁之前更新状态,同时在 rf.Snapshot 中过滤更小的快照。

② 测试 TestSnapshotRecover4B,报错 test_test.go:148: duplicate element x 0 333 y in Append result。定位错误很慢,日志太长不好看,看半天看不出问题。原因是服务器从通道读取快照之后,通道中有快照已经包含的命令消息,所以会重复应用命令。在应用消息到状态机之前,需要限制消息的日志索引必须大于最后应用到状态机的日志索引。其实只要查看修改状态机的关键代码,应该是可以很快找到错误的。

③ 测试 TestSnapshotRecoverManyClients4B,报错 test_test.go:293: get wrong value, key 19。错误的信息表明状态缺失,定位错误很慢,主要日志太多,打印的信息也不全,经常需要添加打印信息然后重新测试,果然应该在最开始就打印所有信息。错误的原因在 Raft 代码,没有限制快照消息在日志消息之前发送,服务器会提前收到快照之后的命令,使得服务器在收到快照时会丢弃快照,因为最后应用的命令索引大于快照的最后索引。解决方案很简单,只需要做一下限制就行。

总结

每次做实验都会遇到两个难点,一个是功能实现,一个是定位错误。

关于功能实现:例如,如何实现将 ApplyMsg 应用到状态机?最开始,我是想让每个 RPC 去读取消息,然后将其应用到状态机。但是,思考之后,会发现不可行,然后又去想其他方案。实现方式的设计依赖于对整个交互逻辑的正确理解,难点在于处理异常情况,像是 leader 变更、服务器崩溃。

关于定位错误:总是应该在日志中包含所有信息,避免需要添加信息然后重新测试的情况,会非常痛苦。定位错误不要只看日志,一定要先思考可能是哪个部分有问题,比如说状态机的状态有问题,那么在什么情况下会修改状态?找到修改状态的代码,然后思考这段代码在什么情况下会有问题。如果代码没有问题,那么想一想是不是外界的输入有问题,什么样的输入会产生该错误?然后,我们就可以找到外部依赖的代码,确定是否有可能产生该输入序列。

Lab 3: Raft

课程网站实验网站实验指南

规则

在此列出比较通用且重要的规则:① 发送心跳 RPC 每秒不会超过 10 次。② 如果多数服务器之间可以通信的话,应该在旧领导者失效之后的 5 秒内选出新的领导者。③ election timeout 的范围应该比论文中提到的 \([150,300]\) 毫秒更大,因为实验要求的心跳消息的发送频率比论文中的更低。当然也不能太大,否则无法满足第 ② 点要求。④ 在循环中使用 rf.killed 函数,避免被测试程序 kill 的 peer 持续运行。

⑤ 每个 RPC 应该在单独的线程中发送和处理响应,因此它们不会相互阻塞,也不会阻塞当前函数,例如心跳和选举。而 RPC 的参数应该在启动线程时复制,因为 peer 的状态随时都可能变化,我们要保证 RPC 是以当前状态发送的。⑥ 发送 RPC 的是 labrpc.go 中的 Call 函数,如果返回 false,则表示没有收到响应。此时,只需要重试复制日志相关的 RPC,而不需要重试心跳和投票相关的 RPC。不重试时,记得直接退出函数,不要继续处理默认的“响应”。

⑦ 在收到 RPC 请求或响应时,比较任期大小,来确定是否需要将当前 peer 的 state 转变为 follower。⑧ 处理响应时,要首先检查所有假设,也就是此时 peer 的某些状态是否和发送请求时相同。如果不一致,则说明响应已经不适用。特别的,应该总是执行第 ⑦ 点。此处需要仔细思考,应该比较哪些状态。

Part 3A: leader election

实现

实现领导者选举和心跳检测。首先按照论文的图 2 在 Raft 结构体中添加状态,然后在 Make 函数中初始化。特别的,我们需要添加额外的状态信息,包括当前 peer 的 state(follower,candidate,leader),以及最近收到的有效 RPC 的时间戳,该时间戳用于判断 follower 是否需要转变为 candidate。有效是指,收到当前 leader 发送的 AppendEntries RPC,或者收到某个 candidate 的 RequestVote RPC 且决定投票给它。当前 leader 的含义需要搞清楚,不仅是任期不旧,而且日志前缀要匹配,也就是返回 success = true 才行。

实现 AppendEntries RPC 和 RequestVote RPC 处理程序,除了按照论文图 2 描述的步骤编写代码以及满足第 ⑦ 点之外,有几个需要注意的细节。如果 RPC 有效,则更新时间戳。理解 up-to-date 的含义,在论文 5.4.1 节的结尾有描述。特别的,对于 AppendEntries RPC,如果当前 peer 的 state 是 candidate,则在当前任期和 RPC 的任期相等时,仍然需要将 state 转变为 follower。

此外,论文图 2 对 RequestVote RPC 实现的描述有遗漏,即使 votedFor 不是 nullcandidateId,只要 term > currentTerm,在满足 up-to-date 的前提下,候选者也可以获得选票。因为,voteFor 的作用只是限制,一个 peer 在一个任期内只能投一次票,如果任期都不相同,当然就不需要做任何限制。

实现心跳检测的函数,首先使用 for 循环 + rf.killed 函数。心跳是周期性的,我们需要等待某个时间,满足第 ① 点。该等待的过程应该放在循环的最后,因为当 candidate 转变成 leader 时,需要立即发送心跳 RPC,防止其他 peer 发起新的选举。在循环开始时,我们需要检测当前 peer 是否仍是 leader,因为它可能转变为 follower。在发送 AppendEntries RPC 和处理响应时,需要满足第 ⑤⑥⑦⑧ 点。

实现超时选举的函数,也就是 ticker 函数,首先依然使用 for 循环 + rf.killed 函数。然后,等待随机化的 election timeout,满足第 ③ 点。应该将等待的代码发在循环的开头,因为初始时所有 peer 都是 follower,放在开头有助于避免选票分裂。然后思考谁会发起选举,显然只有 follower 和 candidate 会。那么它们发起选举的条件分别是什么?在 election timeout 内,如果 follower 没有收到有效 RPC,或者 candidate 没有成为 leader。如果满足条件,则将当前 peer 的 state 转变为 candidate,然后执行论文图 2 中指定的操作。

在发送 RequestVote RPC 和处理响应时,需要满足第 ⑤⑥⑦⑧ 点。如何处理投票的结果?比较自然的想法是使用通道,因为整个过程有点类似生产者消费者模型。更简单的方法是,在 Raft 结构体中记录获取的选票数,这样就可以在发送 RPC 的线程中处理响应。不过,要记得在发起选举时,初始化该字段。当 candidate 转变为 leader 时,记得初始化和 leader 相关的字段,包括 nextIndexmatchIndex

测试

如何执行多次测试,只保留失败测试的输出?不知道。

使用命令 time go test -race -run 3A -count 1 -failfast -timeout 0 | tee log 开始测试:

① 报错 race: limit on 8128 simultaneously alive goroutines is exceeded, dying。不使用 -race,第二个测试一直停不下来,查看日志发现某个 peer 一直不响应。检查代码发现,在退出某个函数时,忘记释放锁,使得该 peer 无法执行任何操作。② 报错 config.go:443: term 2 has 2 (>1) leaders。日志表明,peer 没有获取多数选票仍然当选 Leader。检查代码发现两处错误,某个判断条件写反,以及发送 RPC 没有排除自身,导致自己给自己多投一票。③ 报错 config.go:454: expected one leader, got none。应该是 election timeout 设置太大,没有在规定时间内选举出 Leader。

Part 3B: log

实现

实现日志复制,以及将已提交日志应用到状态机。首先谈谈 AppendEntries RPC 的更多细节,一定要认真读论文图 2 描述的步骤。特别注意,只会删除冲突的日志,不冲突的日志会保留。以及不要搞错更新 commitIndex 的方式,注意是和 “index of last new entry” 取最小值,也就是 args.PrevLogIndex + len(args.Entries)

实现将已提交日志应用到状态机的过程,我们在 Make 函数中单独创建一个线程来处理此过程。我们需要添加一个状态 applyIndex,表示最后应用到状态机的索引位置,初始化为零。然后依然使用 for 循环 + rf.killed 函数,循环执行该过程。什么时候应用到状态机?显然是当前 peer 的 commitIndex 推进时,可以使用 sync.Cond 或者 time.Sleep 来实现等待操作。接下来就很简单,只需按照顺序将已提交日志对应的 ApplyMsg 发送到通道。

实现日志复制,首先实现 Start 函数。逻辑上比较简单,如果当前 peer 是 leader,则将命令包装成日志,将其添加到自身的 log 中。然后,向其他 peer 发送 AppendEntries RPC,需要满足第 ⑤⑥⑦⑧ 点。注意看论文图 2 的描述,发送 RPC 有个前提条件 lastLogIndex >= nextIndex,以及发送的是 nextIndex 对应的日志,该日志不一定是当前命令的日志。简单起见,发送从 nextIndex 开始,之后的所有日志,这样也可以提升 peer 之间同步的速度,测试的运行时间会更短。参数中不要直接使用切片,而要使用切片的副本,否则可能会产生竞态条件。

处理 AppendEntries RPC 的响应,按照论文图 2 的规则编写代码就行,但是有更多细节需要注意。如果返回 success = true,该如何更新 nextIndexmatchIndex?直接将 nextIndex 更新为 args.PrevLogIndex + 1 + len(args.Entries) 是错误的,因为 Start 函数会不断地接收客户端的命令并发送 RPC,会同时存在多个发送给同一个 peer 的 RPC,而且它们都可以返回 success = true,然后 nextIndex 就会被不断地设置为最后一个响应的值。但是,并发 RPC 的顺序会有交错,之后响应的 RPC 的 args.PrevLogIndex + 1 + len(args.Entries) 值可能更小,所以更新时取 max(rf.nextIndex[i], args.PrevLogIndex + 1 + len(args.Entries))。如果返回 success = false,记得重试 RPC。

最后,比较有疑问的点就是 leader 如何推进 commitIndex,论文图 2 说的很明白,但是如何使用代码去实现呢?可以二分 \(N\) 的值,或者对 matchIndex 降序排序,取第 n / 2 + 1 个元素判断是否满足推进的条件,更优的做法是使用快速选择算法。那么应该何时检查条件,尝试推进?我的想法是,在处理 AppendEntries RPC 的响应时,如果某个 peer 的 matchIndex 增加,则做一次检查。特别注意,对 leader 的 matchIndex 做特殊判断。

测试

使用命令 time go test -race -run 3B -count 1 -failfast -timeout 0 | tee log 开始测试:

① 报错 panic: runtime error: index out of range [-1]。代码已经在选出 leader 时,将 nextIndex 正确初始化。说明索引是在递减之后越界,也就是说 AppendEntries RPC 的响应可能有误,检查之后发现在 RPC 处理程序中忘记执行赋值 reply.Success = true。② 报错 config.go:594: one(100) failed to reach agreement。检查日志是正常的,只能在测试代码中添加打印语句,然后发现误将 ApplyMsgCommand 字段设置成日志项,而不是日志项中的命令。③ 依然报错 config.go:594: one(100) failed to reach agreement。查看日志发现,日志提交到多数 peer,但是 leader 没有推进 commitIndex。明显是在统计个数时出现问题,查看代码发现原因是没有对 leader 的 matchIndex 做特殊处理。

④ 报错 config.go:601: one(106) failed to reach agreement。定位错误有点慢,主要是一直傻傻看日志,明明只要再多打印一条信息更详细的日志,就会很容易发现问题。处理 AppendEntries RPC 时,在 Success = false 的情况下,忘记重试 RPC。论文图 2 有说到,“decrement nextIndex and retry”。注意,重试前要根据当前状态重新设置参数的值。⑤ 日志很奇怪,出现没有被执行的操作,报错也找不到错。定位错误很慢,错误的原因是忘记在循环中判断当前 peer 是否被 kill。其实可以想到的,日志混乱就是被 kill 的 peer 执行打印语句造成的,而报错也是因为被 kill 的 peer 仍在执行某个操作。

⑥ 报错 apply error: commit index=2 server=4 1000 != server=2 30Start 函数并发发送 RPC,顺序交错,没有正确更新 nextIndex。就是在上面提到的,需要取两者的最大值。⑦ 报错 testing.go:1152: race detected during execution of test。查错发现,切片是引用类型,作为参数传递到各个位置,引发竞态条件。在最开始,Go 竞争检测器报告的堆栈信息显示,是由 log.Printf 触发的竞争,我还以为是包有问题。之后,某次测试报出和切片相关的竞争,才发现应该是参数的问题。关于切片的线程安全问题,可以查看Reddit 中的讨论

Part 3C: persistence

实现

实现 nextIndex 递减策略的优化,以及持久化必要的状态。比较简单,没有任何问题,测试报错基本上是因为之前的代码有问题。

测试

使用命令 time go test -race -run 3C -count 1 -failfast -timeout 0 | tee log 开始测试:

① 报错:race: limit on 8128 simultaneously alive goroutines is exceeded, dying。定位错误很慢,之前也遇到过该错误,不过是通过其他方式解决的。该错误其实就是因为创建太多线程,检查代码发现原因是,我会在日志 RPC 超时的时候进行重试,而每个 RPC 都在单独的线程中发送,所以无法通信的 peer 会累积很多重试线程。解决方案就是,不在 RPC 失败时重试,而使用定时重试。定时重试的代码和心跳检测基本上是一致的,区别只有是否包含日志项,所以实现时放一起很合理,并且这也可以额外满足只在空闲时发送心跳消息的规则(刚想起这个规则)。

② 报错 config.go:601: one(5634) failed to reach agreement。日志很清晰,问题出在 leader 变更之后,没有调用 Start 函数,使得日志没有同步。此时,定时重试起不到作用,因为 nextIndex 被初始化为 lastLogIndex + 1,而重试的前提是 lastLogIndex >= nextIndex。最开始想到的解决方案是,在当选 leader 时执行一次 AppendEntries RPC,包含索引位置为 nextIndex - 1 的日志项。但是有更好的做法,之所以会报错可以理解为心跳 RPC 没有在冲突时递减 nextIndex,因为我以为不需要处理心跳返回的 success 值,如果按正常流程处理其响应,那么就不会出现这个错误。

Part 3D: log compaction

实现

使用快照实现日志压缩,首先需要明白整体的流程,阅读 Diagram of Raft interactions 会很有帮助。如何创建快照?快照是由状态机/应用层创建的,然后对每个 peer 调用 Snapshot 函数来传递快照,此时 peer 可以丢弃快照包含的日志。而且 peer 也会负责快照的持久化,我们可以修改 persist 函数来实现。值的注意的是,peer 并不负责读取快照,状态机/应用层会读取 peer 持久化的快照。

实现 Snapshot 函数,需要在 Raft 结构体中添加两个状态,日志偏移量和快照。如果快照包含直到 log[Index] 的日志信息,我们会丢弃 Index 之前的所有日志,但是不会丢弃 Index 位置的日志,以便 AppendEntries RPC 对日志做一致性检查。在丢弃日志时需要注意,不要使用切片引用,而是复制切片,否则日志无法进行垃圾回收。Java 中 substring 函数就是因为使用引用可能会保留太多无效数据,而无法进行垃圾回收,所以改为复制的实现方式。

以上两个状态都需要持久化,而且我们需要修改必要的代码,以支持日志偏移量。首先修改 applyIndex 的初始值为日志偏移量,然后修改所有和 log 的索引访问以及长度有关的代码。修改代码感觉侵入性很强,很多地方都需要加减日志偏移量,可读性差并且易错。我想到的解决方案是,将获取日志长度和日志项的代码包装在单独的函数内,其他地方只需要调用对应的函数,从而不会导致混乱。注意,有些代码可能需要特殊处理。

当 leader 不包含 nextIndex - 1 位置的日志时,会调用该 InstallSnapshot RPC 而不是 AppendEntries RPC。特别注意,即使有以上限制,依然需要修改 AppendEntries RPC 处理程序,使其能够正确处理发送的日志项已经被 follower 压缩为快照的情况,因为 RPC 是并发的并且有可能滞后。简单的处理方式是,如果日志项被包含在快照中,则返回 success = true。修改代码时要非常小心,不要执行不必要的操作,从而导致索引越界,或者其他不正确的行为。

实现 InstallSnapshot RPC 处理程序,需要满足第 ⑤⑥⑦⑧ 点。处理程序的框架可以参考 AppendEntries RPC 的实现,实现可以参考论文图 2 描述的步骤。当 follower 收到有效的快照时,需要向 applyCh 通道发送消息。在发送消息之前,需要修改 applyIndex = max(rf.applyIndex, rf.logOffset),之所以取 max 是因为有可能收到描述当前日志前缀的快照。最后,如果 RPC 返回响应且满足第 ⑧ 点,需要取最大值来更新 nextIndex

测试

使用命令 time go test -race -run 3D -count 1 -failfast -timeout 0 | tee log 开始测试:

① 测试无法停止或者卡住,日志表明 leader 在向 applyCh 发送消息时阻塞。我之前就想过 applyCh 有可能会阻塞,但是测试通过就以为它不会阻塞,但是果然通道还是会阻塞。一定记住,不要假设任何不存在的前提条件。② 报错 snapshot decode error。定位错误有点慢,添加打印语句发现参数的快照字段,在发送 RPC 之前有值,但在接收 RPC 时没有值。很明显是字段首字母没有大写,从而没有将字段导出。

③ 报错 apply error: server 0 apply out of order, expected index 1, got 10。很明显是应用层没有读取到快照,在我的代码中,该错误的触发条件是连续两次崩溃重启。原因是 peer 重启时没有读取快照,那么在之后的 persist 就会将已有的快照覆盖为 nil,然后第二次重启应用层没有读取到快照,从而产生错误。④ 索引越界,当 AppendEntries RPC 响应 success = false 时,有可能 reply.XIndex 位置的日志已经被丢弃,需要特判。

问题

在调试时发现,如果 leader 转变为 follower 之后,对应的 election timeout 正好结束,那么由于该 follower 没有设置最近收到的有效 RPC 的时间戳,所以会直接转变为 candidate,发起选举。我的想法是,可以在 leader 转变为 follower 时,重置超时时间。但是,要允许重置就需要在 Raft 结构体中额外添加状态。

在处理 RPC 响应之前,到底应该检查什么假设,一般会检查任期是否和发送时一致。但是,如何判断其他状态是否需要做一致性检查?处理 AppendEntries RPC 响应之前,如果 nextIndex 改变,只要任期没变,也依然可以处理该响应,因为这是正确的。为什么任期改变,就不能继续处理响应?因为当前 peer 可能已经不是 leader,或者虽然是 leader,但是 nextIndex 已经重新初始化,继续处理该响应会造成错误。

处理 RequestVote RPC 响应之前,如果 len(rf.log) 改变且任期没变,是否可以处理该响应?len(rf.log) 改变,说明当前 peer 收到有效的 AppendEntries RPC,它已经和 leader 取得联系转变为 follower,所以不需要处理该响应。类似的问题还可以提出很多,总之确定应该检查什么状态,来维持程序的正确性,需要仔细思考。

疑问:有可能收到当前快照的前缀快照吗?发送快照消息给 applyCh 时,解锁是否有可能产生错误?更新 nextIndex 时,由于 RPC 的并发性,总是需要取最大值来更新?之前处理 AppendEntries RPC 的响应时,并发返回 success = false,是否有可能不正确的递减 nextIndex,甚至使其为负数?

总结

实验有很多细节点,有时以为自己知道该怎么做,但在编写代码却没有完全按照准则说的做,往往会遗漏某个前提条件或者某个操作,以及理解错某个概念。在做每一步之前,最好问一下自己,是否应该这样做,有什么特殊情况。日志对调试的帮助很大,我还是第一次打印这么大量且详细的日志,来调试并发程序。代码中同时存在多个错误真的很难搞,有时修复一个错误之后仍然报错,那么有可能是修复时引入的错误,或者之前代码中遗留的错误。在测试的后半部分,有种面向测试查错的感觉,个人感觉不是很好,最好还是能自己顺着逻辑找出错误。如果隐藏测试代码,估计得花费更长时间查错。


创建 go-test-many.sh 脚本,使用命令 bash ./go-test-many.sh 1000 8 开始并行测试。测试 Test (3C): Figure 8 (unreliable) ... 失败一次,报错 config.go:601: one(9042) failed to reach agreement。日志表明,follower 没有在限定时间内和 leader 同步。由于我的代码在某些情况下,只会传递 nextIndex 位置的日志,而不是之后的所有日志,此时会有同步过慢的问题,简单修改一下就可以解决。