CMU 15-445/645 — Fall 2024 Projects

参考 课程网站项目地址测试网站Discord 频道。尝试做一下新版的项目。

Project #0 - C++ Primer

Task #1 & Task #2

算法设计

HyperLogLog 主要用于近似计算 multiset 中的基数(不同元素的数量),相比精确计算需要的 \(O(n)\) 空间 \(n\) 为基数的大小),该算法只需要使用很少的内存。

基本想法是,使用哈希函数将多重集合的元素映射到均匀分布的随机数上,然后记录最左或者最右连续 \(0\) 的最大数量。利用哈希值的随机性,某个比特位为零的概率是 \(\frac{1}{2}\),如果最多 \(n\) 个连续的零,则估计多重集合的基数为 \(2^{n}\),因为该哈希值出现的概率是 \(\frac{1}{2^{n}}\)。

不过该算法方差较大,只要有一个哈希值包含很多零,就会严重高估基数。所以,可以将多重集合拆分为多个子集,通常是利用哈希值的前 \(k\) 个比特位确定元素拆分到的桶,然后每个桶内维护剩余比特位中连续零的最大值,取加权平均值 \(2^{\frac{1}{m}\sum_{i=1}^{m}{R_{i}}}\) 得到每个桶的平均基数(其中 \(m=2^{k}\)),然后再乘以 \(m\) 得到多重集合的基数估计值。

额外的优化是使用 constant=0.79402 和调和平均来提升准确率,使用稀疏布局和密集布局来平衡内存开销和准确率,最后计算公式如下。(参考 HyperLogLog in Presto: A significantly faster way to handle cardinality estimationA problem so hard even Google relies on Random ChanceHyperLogLog

$$ CARDINALITY_{HLL}=constant\cdot m\cdot \frac{m}{\sum_{i=1}^{m}2^{-R_{i}}} $$

注意事项

n_bits 的范围可以确定下限是 0,上限不确定,如果很大会内存溢出。② 很多地方使用的是无符号整数,在使用字面量时需要声明后缀(例如 1UL),以及做算术运算时需要避免溢出(例如取负值时需要转为有符号数)。③ 做移位操作 1 << x 时需要注意 x 的大小,如果结果超出表示范围则行为未定义。经过测试不同环境 1 << 32 会得到 0 或者 1。④ 可以使用 ./test/hyperloglog_test --gtest_filter=HyperLogLogTest.EdgeTest1 执行指定的测试。

Project #1 - Buffer Pool Manager

Task #1 - LRU-K Replacement Policy

设计思路

简单的实现,使用单个哈希表维护 frame_idLRUKNode 的映射关系,LRUKNode 负责维护该页面的最后 k 次访问时间戳,在淘汰时通过扫描整个哈希表来确定需要淘汰哪个 frame_id,淘汰的时间复杂度为 \(O(n)\)。如果使用两个链表分别维护 <k>=k 的 Frame 访问顺序,如果不考虑 Pin,那么可以 \(O(1)\) 时间实现淘汰。但是由于不知道 Frame 是否被 Pin,所以仍然需要遍历链表查找第一个未被 Pin 的 Frame。也可以使用类似 TreeMap 的结构,直接存储未被 Pin 的所有 Frame,按照次数、时间戳排序,淘汰的时间复杂度是 \(O(\log{n})\),不过每次访问都需要重新删除插入来保证排序。(实际上都差不多,因为 BPM 的主要瓶颈在磁盘 I/O)

Task #2 - Disk Scheduler

设计思路

比较有意思的设计就是毒丸(poison pill),队列中存储的是 std::optional<bustub::DiskRequest> 类型的元素,向队列中添加 std::nullopt 元素,表示终止工作线程的执行。

Task #3 - Buffer Pool Manager

设计思路

缓冲池的基本功能就是获取 Frame,可以从空闲列表或者淘汰 Frame 来获取。如果是通过淘汰获取,则需要删除 Frame 和 Page 的映射关系、将脏页刷盘以及重置 FrameHeader。然后可以将 Frame 和 Page 关联,需要设置 Frame 和 Page 的映射关系、设置 FrameHeader 的成员变量、从磁盘读取数据、调用 LRUK 相关函数。

最后可以将该 Frame 和 PageGuard 关联,由于 PageGuard 会获取 Frame 的读写锁,所以要在创建 PageGuard 之前解锁 BPM 独占锁,从而避免 BPM 阻塞在 Frame 的锁上。例如,典型的情况是有两个线程同时对相同的 Page 获取 WritePageGuard。

线上测试就只有 BufferPoolManagerTest.SchedulerTest (0/0) 没有过,花几个小时找 BUG 都没找到,最后还是简单暴力打印日志解决问题。只能说别想太多,问题出在 is_dirty_ 的设置位置不对,我在 GetDataMut 方法中设置,而即使不调用该方法,NewPage 得到的空白页面也算是脏页,所以要在构造函数中设置。而且,调用 FlushPage/FlushAllPages 之后不应该将脏页标志重置,因为只要 WritePageGuard 还在那么页面依然可能变脏。(如果在刷盘的过程中修改页面,或许会产生问题,应该在刷盘之前获取 Frame 的读写锁,但是要避免和 BPM 独占锁产生死锁,这个点暂时不做)

Leaderboard Task (Optional)

方案一(×)

简单的想法是在磁盘 I/O 时不持有 BPM 锁,而是直接解锁然后在磁盘 I/O 结束之后加锁。但是在如下交错下,相同的页面会占用多个 Frame(Problem #1)。解决方式也有,就是在解锁 BPM 之前更新 Page 和 Frame 之间的映射关系。但是会有新的并发问题,也就是之后的线程判断 Page 在缓冲池中,然后直接读写该 Frame,而此时之前的脏页还没有刷盘,Page 数据也没有读取到 Frame 中(Problem #2)。

1
2
3
Problem #1
thread A -> fetch page 1 -> bpm lock -> evict -> bpm unlock (t1) -> dirty flush
thread B -> fetch page 1 -> bpm lock (t2) -> evict (error)
1
2
3
Problem #2
... -> bpm lock -> evict -> update map -> bpm unlock (t1) -> dirty flush
... -> bpm lock (t2) -> exist -> bpm unlock -> frame lock -> read/write page (error)

所以需要在解锁 BPM 之前给 Frame 加独占锁,但是这样做需要调整代码结构来避免死锁,也就是把 BPM 级别的共享变量的更新都放在解锁 BPM 之前,从而避免之后再加锁 BPM。这里给 Frame 加锁不会像之前一样阻塞 BPM,因为此时的 Frame 不被任何 PageGuard 持有。如果 Page 在缓冲池中,设置 Replacer 相关的数据一定要持有 BPM 锁,因为淘汰时只会加 BPM 锁,要避免页面在淘汰之后立即被访问的情况。(特殊情况可以不加,但是要注意如何设置值)

1
2
3
Problem #3
... -> frame lock -> bpm unlock (t1) -> dirty flush -> bpm lock (deadlock)
... -> bpm lock (t2) -> exist -> bpm unlock -> frame lock (deadlock) -> read/write page
1
2
3
Problem #4
... -> bpm lock -> exist page 1 -> frame lock -> bpm unlock (t1) -> access page 1 (t4)
... -> bpm lock (t2) -> evict page 1 (t3) -> ...

这样还是有问题,就是淘汰的 Frame 在包含脏页刷盘时会解锁 BPM,而此时如果有线程获取该 Page,则会从磁盘读取到旧页面或者此时磁盘中没有该页面(如果该页面是 NewPage)。此时需要额外维护哈希表存储相关信息,使用额外的锁来保护,该锁需要在解锁 BPM 之前持有,否则依然会发生上述错误。如果当前读取的 Page 还没有完成刷盘,则直接从旧 Frame 复制到当前 Frame 中。可以发现使用这种解锁 BPM 的方案会有各种问题,这都是我上次实现时遇到过的,最后 QPS 从 3700+ 反向优化到 1000+。

1
2
3
Problem #4
... -> bpm lock -> evict page 1 -> bpm unlock (t1) -> dirty flush
... -> fetch page 1 -> bpm lock (t2) -> disk read

方案二(×)

主要瓶颈在单个 BPM 独占锁,可以使用类似 Java 的 ConcurrentHashMap 的思路,将 BPM 锁拆分成多个锁。由于不能使用外部现有的并发库,那么可以使用哈希表数组,根据 Page 的编号映射到不同哈希表中。如果需要同时锁定多个分区,则需要按照顺序加锁来避免死锁,可以使用 std::scoped_lock。还有很多坑点和方案一类似,最后 QPS 从 3700+ 反向优化到 900+。

1
2
3
Problem #1
thread A -> bpm lock 1 -> bpm lock 2 (deadlock)
thread B -> bpm lock 2 -> bpm lock 1 (deadlock)

方案三(√)

可以不使用额外的锁,而是设置标志位表示该页面正在刷盘或者读取中,例如在 Page 到 Frame 的哈希表中记录无效的 Frame 编号,然后其他获取线程使用条件变量等待执行完成。需要注意等待的条件是哈希表中不存在该 Page 映射或者对应 Frame 编号有效,因为如果空闲列表为空且无法淘汰其他 Frame,则需要手动删除该临时键值对而不会设置有效的 Frame 编号。此时可以在磁盘 I/O 时解锁 BPM,之后加锁修改标志位,然后唤醒线程。

如果磁盘调度器不使用线程池,则 QPS 反向优化到 900+。如果使用单队列 + 8 线程,则 QPS 优化到 5300+。实际上不使用磁盘调度器的后台线程,而是直接调用 Schedule 方法,QPS 可以到 14700+。使用单队列 + 16 线程,则 QPS 优化到 14000+,因为工作线程总共有 16 个。

如果为每个线程分配一个队列,任务循环放置到每个队列,则 QPS 只有 11000+。有延迟场景性能更低可能是因为没有任务窃取机制,某个线程没被调度导致相应队列的任务积压。但是无延迟场景的 QPS 更接近不使用后台线程的情况,因为各个线程不必竞争相同队列的锁。(该优化会有问题,如果存在对同一页面的多个磁盘 I/O 请求,可以想到的情况是主动 Flush 脏页,会存在多个对相同页面的写请求,无法保证顺序)

由于测试使用 8 个 Scan 线程顺序读,8 个 Get 线程随机写(使用 Zipfian 分布)。由于第三个测试结果的权值最大,主要优化第三个场景,也就是顺序读每 0.1ms 执行一次,随机写每 1ms 执行一次。理论上最大 QPS 是 88000,目前看来 Scan 操作的 QPS 优化空间较大。

1
scan_qps_large / 1000 + get_qps_large / 1000 + scan_qps_small / 1000 + get_qps_small / 1000 + scan_qps_1ms + get_qps_1ms

经过测试,Scan 操作的缓存命中率都是 0%,使用优先淘汰策略(不论优先淘汰什么类型)的 Scan 操作 QPS 可以提升到 30000+。使用默认的 LRUK 淘汰策略、优先淘汰只被 Scan 访问的 Frame、优先淘汰只被 Get 访问的 Frame 的 Get 操作命中率和 QPS 分别是 7% 5800+、11% 4300+ 和 3% 6500+。

虽然按理说优先淘汰只被 Scan 访问的 Frame 比较正常,因为 Scan 线程是顺序读本身就无法利用缓存,提前淘汰不会影响性能。基于单队列 + 16 线程的代码,可以将 QPS 从 14000+ 优化到 42000+。但是实际上优先淘汰任意访问类型的 Frame 都可以得到差不多的 QPS,因为只要提升 Evict 的速度,就可以大幅提升 Scan 操作的 QPS。另外,命中率更高的 Get 操作 QPS 反而更低,真不知道什么原因。

排名

Rank Submission Name scan_qps_large get_qps_large scan_qps_small get_qps_small scan_qps_1ms get_qps_1ms QPS
98 ALEX 41513 4156 48125 643 3433 248 3776
24 ALEX 19407 19844 17793 21289 2491 2788 5358
16 ALEX 107110 104764 99692 102739 8370 5930 14715
18 ALEX 13775 13869 14001 16822 8148 5829 14035
22 ALEX 70177 70478 65910 73478 5775 5049 11105
4 ALEX 19866 10945 20708 13994 37939 4325 42330

Lab 5: Sharded Key/Value Service

课程网站实验网站实验指南

Part A: The Controller and Static Sharding

实现

基本上就是复制 lab4 的代码,实现平衡分片到服务器的代码会花点时间,像在做算法题。

测试

使用命令 time go test -race -run 5A -count 1 -failfast -timeout 0 开始测试:

① 测试 TestMulti,报错 test_test.go:61: Shards wrong。日志显示,不同状态机的 Shards 数组包含相同的元素,但是顺序不同。可以想到,错误原因在于改变状态机的代码是非确定性的,由于提示中有说 map 遍历顺序的不确定性,所以我们可以很快定位错误。要使遍历顺序具有确定性,只好将 map 中的键放在数组中,排序之后遍历。

Part B: Shard Movement

实现

假设副本组 G1 负责的分片 S1 需要移动到副本组 G2。在 G1 发送分片之后,以及 G2 接收分片之前,都不应该处理和该分片相关的客户端请求,不论是新收到的请求还是已经在 applyCh 中的请求。G1 应该返回 ErrWrongGroup,G2 应该使用条件变量等待分片的到达。和 lab4 相同,如果服务器发现其已经不是 leader,则不应该继续等待。而且,G2 必须将接收的分片使用 Raft 应用到状态机之后,才能返回响应。为了方便的发送和删除分片,服务器应该按分片来存储数据。如果服务器需要发送多个分片,可以简单地串行发送,更好的做法是,并发地向不同副本组发送分片,在单个 RPC 中包含所有需要发向该副本组的分片。

如果在 G2 更新配置之前,G1 发送的分片到达 G2,该如何处理?在请求中应该包含配置编号,如果 G2 发现更大的配置编号,则使用条件变量等待配置更新,再处理该请求。如果 G1 发生故障,G2 可能会收到重复的分片,在将分片应用到状态机之前,应该限制该分片所属的配置编号和当前配置相同,并且使用副本组编号和配置编号的组合来去重。如果分片的配置编号小于当前配置编号,则直接响应 OK

对于客户端请求,必须实现跨副本组的重复检测。例如,副本组 G1 已执行客户端的请求但未响应,然后 G1 向 G2 发送分片,最终客户端会向 G2 发送重复的请求。简单的解决方案是,G1 不仅发送分片,还发送用于重复检测的数据,所以重复检测的数据最好也是按照分片来存储。

服务器需要定期检查配置是否变更,在一个副本组中,只有 leader 会检查,还是所有副本都会检查?应该是只有 leader 检查配置变更,然后使用 Raft 将变更复制到其他副本中。因为不同副本检查配置变更的时机不同,服务器拒绝 applyCh 中相关请求的时机也就不同,从而状态机会不一致。在将配置变更应用到状态机之后,才开始发送分片,从而确保不同副本都会发送相同的分片。

在检查配置变更时,需要根据配置编号获取下一个配置,而不能直接获取最新配置,因为在未检查期间配置可能变更多次。如果 Raft 选举出新的 leader,它会检查下一个配置变更将其添加到日志,若此时通道中已经存在旧 leader 提交的一个/多个配置变更的日志,有可能会导致不正确的覆盖。所以,在将变更应用到状态机之前,应该总是限制配置编号是单调递增的。

配置必须线性地变更,如果配置变更多次,之后的变更必须等待之前的变更完成。可以将等待发送和接收的分片记录到状态中,然后使用条件变量检查状态来确定之前的配置是否完成。特别的,如果配置的编号为 1,则不需要记录状态。如果 leader 在变更配置的过程中崩溃,新的 leader 需要知道当前配置是否变更完成,所以上述状态应该使用 Raft 复制到副本中。同时,创建一个线程定期检查(或者使用条件变量),如果当前服务器是 leader,则发送状态中记录的待发送分片,发送完成之后删除对应分片。

测试

使用命令 time go test -race -run 5B -count 1 -failfast -timeout 0 开始测试:

① 在将配置应用到状态机之后,没有拒绝 applyCh 中的相关请求。 ② 在处理发送分片的 RPC 的响应时,忘记检查假设,需要确保当前配置编号和发送 RPC 时相同。 ③ 读取快照的语句放在初始化之前,导致错误的状态。 ④ Op 中的 map 会和 Rafte.Encode(rf.log) 竞争,所以总是使用 map 的副本。 ⑤ 在修改状态时,没有唤醒条件变量。 ⑥ 有些状态只有 leader 会修改,所以 follower 需要特殊处理。

⑦ 如果 leader 接收分片,使用 Raft 复制到多数副本,在返回响应之后所有副本都崩溃。重启之后,由于 Raft 对提交之前任期的日志条目存在限制,该日志无法提交。此时,客户端请求该分片的数据将会阻塞直到日志应用,而日志无法提交除非在当前任期提交一个日志。解决方案就是,在重启之后总是添加一个空日志条目,来推进 commitIndex。之前测试有发生过低概率的报错,我当时就怀疑是 Raft 的实现有问题。

总结

Part B 有点折磨,有些很蠢的错误,花费很长时间才定位到。例如 ⑥,主要是太相信某个部分不会出错,结果就是那个地方有问题。例如 ①,以为已经实现某个功能,实际上存在遗漏,在定位错误时就不会往那个方面想。原因在于,没有在完成部分功能时做相关的测试,从而将发现错误的时间延后,此时代码已经有很多其他逻辑,定位错误会更加困难。错误 ②③④⑤ 是细节上的遗漏,因为功能实现并不是线性的过程,添加某个功能会修改已经实现的部分,逻辑交互在一起难免会有遗漏,特别是在代码组织很糟糕的情况下。整体来说,程序的并发运行会产生意想不到的交错,特别是存在网络或者机器故障的情况下,真的很难在最开始就写出正确的代码。

Lab 4: Fault-tolerant Key/Value Service

课程网站实验网站实验指南

单个客户端只会串行发送 RPC,但服务器会收到多个客户端的 RPC。

Part A: Key/value service without snapshots

实现

实现 ClerkGetPutAppend 方法。如果不知道谁是 leader,则迭代地向每个服务器发送 RPC。直到某个服务器回复操作成功,此时记录该服务器为 leader,以免每次发送 RPC 都需要重新确定 leader。如果标记为 leader 的服务器回复操作失败或者超时未回复,依然需要迭代所有服务器。因为 leader 随时可能变化,甚至是在迭代的过程中,所以需要在迭代的外层添加一个无限循环。

实现 KVServerGetPutAppend 方法。基本的结构是,首先调用 rf.Start 方法,如果不是 leader 则响应 ErrWrongLeader。否则,在正常情况下,我们等待通道中出现对应的命令,将命令应用到状态机,然后返回响应。由于 rf.Start 方法不保证会提交指定的命令,我们需要一种能够检测当前服务器已经不是 leader 的方法,从而避免客户端阻塞在非 leader 服务器上。方式如下,可以定时调用 rf.GetState 检查当前服务器的任期是否和之前调用 Start 得到的任期不一致,或者在通道中出现和请求不匹配的消息。

对于每个服务器,我们应该使用一个线程来读取 ApplyMsg,不断地将已提交的命令应用到状态机。我们不能等待 RPC 去读取对应的 ApplyMsg,因为只有 leader 会收到 Clerk 的 RPC,此时其他服务器需要自动同步。需要注意,不是所有提交的命令都会应用到状态机,因为会存在重复的请求。例如,当前 leader 在提交日志之后崩溃,客户端没有收到响应,然后向新的 leader 发送重复的请求,如果此时命令没有被应用到状态机,依然会调用 rf.Start 方法,重复的请求会出现在日志中。

我们需要为每个客户端记录其最后应用到状态机的命令对应请求的标识,来过滤重复的请求。特别的,Get 请求不会对状态机产生影响,并且将 Get 请求放入日志只是为保证单个客户端视图的单调性,所以我们不需要过滤重复的 Get 请求,也就不需要保存上次 Get 请求的结果,而总是应该返回最新的结果。对于每个 Clerk,只缓存上次请求的 RequestId 是否有可能产生错误?例如,某个很早之前发送的 RPC 到达 KVServer,而对应的 RequestId 已经被覆盖。不会发生该情况,TCP 会保证发送顺序和到达顺序一致。

测试

使用命令 time go test -race -run 4A -count 1 -failfast -timeout 0 开始测试:

① 测试无法停止,查看日志发现,客户端请求阻塞在非 leader 服务器上。通过定时检查服务器的任期,可以解决该问题。

Part B: Key/value service with snapshots

实现

什么时候检查大小?可以定时检查。如果超过大小,该执行什么操作?调用 rf.Snapshot。什么时候读取快照?在服务器启动时,以及从通道中。特别的,如果 maxraftstate = -1,则不需要快照。通道中的快照是否有可能比服务器的当前状态更小?

测试

使用命令 time go test -race -run 4B -count 1 -failfast -timeout 0 开始测试:

① 报错 panic: runtime error: slice bounds out of range [-190:]。Raft 发送快照给服务器,而服务器也发送快照给 Raft,状态交叠从而产生错误。要解决该问题,第一个想法是在 Raft 向服务器发送快照时不释放锁,但是有可能产生死锁。第二个想法是,在服务器从通道接收快照消息之后,再更新 Raft 的状态。但是还是不行,如果服务器读取快照消息却没有应该到状态机,然后服务器在 Raft 更新状态之前,调用 persister.RaftStateSize 检查大小,此时 Raft 更新状态,之后服务器调用 rf.Snapshot 方法传递一个更小的快照,必然会产生索引越界。或者,服务器调用 rf.Snapshot 方法传递一个更大的快照,然后 Raft 更新状态,也会索引越界。而且,在之后更新状态必须重新检查假设,很麻烦。简单起见,依然在解锁之前更新状态,同时在 rf.Snapshot 中过滤更小的快照。

② 测试 TestSnapshotRecover4B,报错 test_test.go:148: duplicate element x 0 333 y in Append result。定位错误很慢,日志太长不好看,看半天看不出问题。原因是服务器从通道读取快照之后,通道中有快照已经包含的命令消息,所以会重复应用命令。在应用消息到状态机之前,需要限制消息的日志索引必须大于最后应用到状态机的日志索引。其实只要查看修改状态机的关键代码,应该是可以很快找到错误的。

③ 测试 TestSnapshotRecoverManyClients4B,报错 test_test.go:293: get wrong value, key 19。错误的信息表明状态缺失,定位错误很慢,主要日志太多,打印的信息也不全,经常需要添加打印信息然后重新测试,果然应该在最开始就打印所有信息。错误的原因在 Raft 代码,没有限制快照消息在日志消息之前发送,服务器会提前收到快照之后的命令,使得服务器在收到快照时会丢弃快照,因为最后应用的命令索引大于快照的最后索引。解决方案很简单,只需要做一下限制就行。

总结

每次做实验都会遇到两个难点,一个是功能实现,一个是定位错误。

关于功能实现:例如,如何实现将 ApplyMsg 应用到状态机?最开始,我是想让每个 RPC 去读取消息,然后将其应用到状态机。但是,思考之后,会发现不可行,然后又去想其他方案。实现方式的设计依赖于对整个交互逻辑的正确理解,难点在于处理异常情况,像是 leader 变更、服务器崩溃。

关于定位错误:总是应该在日志中包含所有信息,避免需要添加信息然后重新测试的情况,会非常痛苦。定位错误不要只看日志,一定要先思考可能是哪个部分有问题,比如说状态机的状态有问题,那么在什么情况下会修改状态?找到修改状态的代码,然后思考这段代码在什么情况下会有问题。如果代码没有问题,那么想一想是不是外界的输入有问题,什么样的输入会产生该错误?然后,我们就可以找到外部依赖的代码,确定是否有可能产生该输入序列。