System Design(草稿)

参考 GitHub Awesome System Design

短链系统

我的想法

核心功能就是长链转短链,可以从使用算法和存储方式两个方面思考。算法首先想到的是哈希函数,可以将长链转为一个短链整数(需要处理哈希冲突),然后使用 Base62 编码为包含 [0-9a-zA-z] 的短链字符串。存储时可以存储长链和短链整数/字符串,不同存储格式主要是时间和空间的权衡,然后需要分别在两个字段上建立唯一/前缀索引来加速查询。

实际上可以将数据库的主键 ID 作为短链整数,这样空间占用更小,也不存在哈希冲突。查询短链到长链的映射可以直接利用主键索引,只需给长链建立索引来加速反向查询。虽然每次查询都要将短链整数编码为短链字符串,不过时间开销不算大,长度为 7 的字符串就可以表示 \(62^7\approx3\times 10^{12}\) 个不同的短链,而且可以使用缓存提升查询性能。

文章内容

需求分析可以分为功能性需求和非功能性需求,功能性需求主要是业务需求,非功能性需求包括高性能、高可用、可扩展性和安全性。然后可以假设流量特征来估算系统的吞吐量、磁盘/内存空间以及带宽需求。流量特征可以从每日缩短 URL 的请求数量、读写比例、峰值流量和平均 URL 长度等方面思考。最后根据估算结果来估计需要的基础设施。

假设每天 100 万个缩短 URL 的请求,读写比例 100 : 1,峰值流量是平均负载的 10 倍,平均 URL 长度为 100 个字符。计算吞吐量,平均写吞吐量 \(\frac{\times 10^{6}}{24\times3600}\approx 12\) WPS,平均读吞吐量 \(100\times 12=1200\) RPS。假设每个 URL 占用 127 字节的存储空间,由短链 7 字节、长链 100 字节、创建时间 8 字节、到期时间 8 字节和点击次数 4 字节组成,那么每年就需要 \(127\times 10^{6}\times 365\approx 46.4\) GB 的磁盘存储空间。假设读取返回的 HTTP 301 重定向大小为 500 字节,每天读取带宽为 \(500\times 10^{6}\times 100= 50\) GB/day,每秒峰值读取带宽为 \(500\times RPS\times 10= 6\) MB/s。

系统负载是读多写少的,假设负载符合二八定律(Pareto principle),即 20% 的 URL 产生 80% 的读取流量。假设每天缓存 20% 的 URL,则需要 \(127\times 10^{6}\times 0.2=25.4\) MB 的内存作为缓存。假设缓存命中率为 90%,则到达数据库的平均请求数量为 \(1200\times0.1=120\) RPS。最后根据以上估算结果,估计使用 4-6 个服务器,每个服务器能够处理 200~300 RPS,10~20 个分布式数据库节点,3~4 个分布式缓存节点。

由于需要存储数十亿条记录,而且数据库操作大多是简单的键值查找,不需要执行复杂的表连接,所以选择使用 DynamoDB 或 Cassandra 等 NoSQL 数据库,因为它们能够高效处理数十亿次简单键值的查找,而且提供高可扩展性和高可用性。

可以对长链使用哈希函数(MD5、SHA-256)得到哈希值,然后取哈希值的前缀转十进制,再使用 Base64 编码得到短链。要处理哈希冲突,可以使用不同的种子重新哈希或者取哈希值的其它位生成短链,也可以将增量后缀附加到短链。另一种生成短链的方式是使用分布式 ID(数据库自增 ID、Redis 生成 ID、雪花算法),从安全性角度思考,增量 ID 是可预测的,有人可以通过使用短链服务推断系统的 URL 总数或者猜测其他用户生成的 URL,混淆(Obfuscation)增量 ID 可以缓解该问题。

额外功能:① 用户自定义短链,需要进行格式校验以及唯一性检查,如果和已有的短链发生冲突,服务器可以返回错误或者建议的替换方案。服务器还可以保留部分短链,以供内部使用。② 设置短链的过期时间,由用户指定或者使用默认的过期时间,可以使用定时任务或者在请求时检测短链是否过期,然后(逻辑)删除过期短链。③ 可以使用消息队列记录短链的访问日志,然后批量处理日志将其聚合存储在数据仓库中,以实现分析服务。

其他问题:① 使用负载均衡、基于范围/哈希的分片、缓存保证可扩展性。② 使用复制、自动故障转移、跨多个地理区域部署服务保证可用性。③ 处理边界条件,例如对于过期短链服务器返回 HTTP 401 状态码,不存在的短链返回 404 状态码,在生成短链时检测冲突。④ 限流防止滥用,输入校验确保用户提供的短链不包含恶意内容,使用 HTTPS 防止窃听和中间人攻击,对异常活动模式进行监控和报警。

遗漏的点

功能性需求没有想到重定向和过期时间的设计,以及允许用户自定义短链、数据分析等额外功能。重定向可以使用 301/302 状态码(永久/临时重定向),临时重定向每次请求都会经过短链服务,从而可以统计短链的使用次数。没有特别思考非功能性需求,基本上是从单服务器的角度思考问题。对哈希函数有点误解,误认为哈希的结果都是整数,像是 MD5、SHA 生成的哈希值都是可以包含字母的。我之前都是默认使用 MySQL + Redis 的组合,没有想到使用其他 NoSQL 数据库。使用哈希函数索引实现长链快速判重,使用布隆过滤器过滤请求。

一些疑问

Q:为什么估计使用 4-6 个服务器,10~20 个分布式数据库节点,3~4 个分布式缓存节点?

Q:文章描述的系统似乎只会检测短链冲突,而没有检测重复的长链?

Q:DynamoDBCassandra 数据库有什么特点?

排行榜系统

我的想法

排行榜主要是按照分数对用户进行排序,功能性需求主要是显示排名和更新排名。像是游戏打完排位会显示排名上升多少,然后排行榜只会显示 TopK 用户,每隔固定时间才更新榜单。像是算法比赛的排行榜会显示所有用户,榜单也是实时更新的。对于 TopK 榜单一般会使用小顶堆来实现,当新分数大于堆顶元素时,就弹出堆顶元素然后将当前元素入堆。

那么其他用户如何在分数变化之后计算排名呢?简单的想法就是直接更新 MySQL 数据库中的分数,然后获取所有小于该分数的用户数量。对于游戏这种分数频繁变化的场景,更新、查询数据库的磁盘 I/O 开销较大。如果能够将所有用户的分数信息缓存到 Redis 中,则可以直接使用跳表维护排行榜,然后使用消息队列异步更新数据库(类似后写缓存策略)。

像是算法比赛这种结束之后排名就不会变化的场景,数据库中也可以存储排名信息(而不只是分数)。如果内存中存不下所有用户的分数信息,可以对数据分片使用多个 Redis 节点维护排行榜,获取排名就需要对所有节点数据做聚合。应该也可以使用统计数据估算排名,返回给前端的是估算排名而不是真正的排名,然后后台使用定时任务去修正排名。

文章内容

排行榜分为绝对排行榜和相对排行榜,绝对排行榜显示 TopK 用户,相对排行榜显示指定用户周围的用户。HTTP 响应可以设置 cache-control: private, no-cache, must-revalidate, max-age=5 使用缓存,状态码 202 表示异步处理请求、401 表示身份未认证、403 表示没有权限,使用 HEAD 请求查看服务的健康状况。

功能性需求:绝对和相对排行榜,可以查看指定用户的排名,排行榜可以划分为全球、区域和朋友圈,可以查看历史比赛的排行榜,可以根据每天/周/月的游戏情况进行排名,客户端以分布式的方式在全球范围内更新排行榜。流量特征:每天 5000 万个写入请求,读写比例为 5 : 1,用户分布在全球各地,排行榜需实时显示。

客户端使用 WebSocket 协议和负载均衡器建立连接,保证排行榜的实时性(允许服务器主动推送数据)。依然使用 DynamoDB 实现可扩展性,将玩家 ID 作为分区键、玩家得分作为排序键。东西有点多,看得有点懵,先实现再说。

遗漏的点

单个用户的分数变化会导致其他用户的排名也发生变化(导致缓存失效),所以使用 MySQL 数据库获取排名的方式需要全表扫描更新所有用户的排名,或者延迟更新每个用户的排名,磁盘 I/O 开销会比我之前想的更大。更新 MySQL 分数之后应该主动更新缓存而不是删除缓存(从 Cache Aside 变为类似 Write Through),确保所有用户都在 Redis 的排行榜中(所以也不能设置过期时间),避免重建整个排行榜。不过主动更新缓存可能导致不一致,例如网络原因导致缓存更新失败、并发更新的顺序交错导致旧数据覆盖新数据,可以使用消息队列排队、重试请求保证正确性。

一些疑问

Q:既然游戏和用户的关系是一对多,即不同游戏的用户不会共享,那么完全没有必要将不同游戏的数据耦合到一个数据库中。而且 Redis 维护的排行榜数据以 leaderboard_id 作为键是有问题的,这可是 Leaderboards 表的主键。

LSM in a Week

参考 mini-lsmDiscordRust DocumentationTour of Rust

Week 1 Overview: Mini-LSM

Memtables

如果要在内存中存储键值对,可以简单的使用哈希表,但是如果要支持顺序遍历以及范围查询,要使用特殊的数据结构,例如跳表、B/B+ 树、红黑树、AVL树等。为什么 LSM 实现使用跳表(crossbeam_skiplist)作为内存表的数据结构? B/B+ 树主要是针对磁盘 I/O 做优化,插入/删除会涉及页分裂和合并,在纯内存的场景下不太适用。红黑树是近似平衡的,AVL 树是严格平衡的,所以红黑树的插入/删除效率更好,而 AVL 树的查找效率更好。跳表的实现更加简单,而且并发友好,只需要锁定局部数据。不过跳表是基于链表实现的,数据局部性可能较差,而且概率数据结构稳定性也会差点。

为什么内存表在 delete 时只是将值置为空,而不是删除整个键值对? 因为如果当前内存表找不到指定的 Key,那么会继续在已经冻结的内存表中找,如果删除整个键值对,则会丢失该删除操作。为什么要有多个内存表呢? 当前内存表执行 get/put 操作的同时,可以基于冻结的内存表构建 SST,然后刷到磁盘中,而不会 STW 阻塞整个 LSM。

何时使用 state 读写锁和 state_lock 独占锁? 由于使用的跳表是支持无锁并发的,读写跳表只需要加 state 读锁,但是判断是否冻结以及整个冻结操作都需要加 state_lock 独占锁来保证原子性,可以使用双重检查加锁来优化性能。然后冻结操作内会加 state 写锁,将当前内存表加入冻结列表,然后创建一个新的内存表。在内存表超出限制大小需要冻结时,将 state 读锁释放再加写锁,与直接将读锁升级为写锁有什么区别? 在释放读锁之后,依然允许其他线程执行 get/put 操作,此时当前冻结线程可以执行昂贵的磁盘 I/O 而不会影响整体的性能。

Merge Iterator

可以构建迭代器来遍历内存表,像是 Java 中的迭代器通常会有 hasNext()next() 方法来判断迭代器的有效性以及获取下一个元素。StorageIterator 迭代器的设计有点不同,next() 只会移动内部的跳表迭代器而不会返回元素,获取元素是通过 key()value() 方法实现的。为什么 StorageIterator 迭代器的设计和 Rust 风格的迭代器不同?为什么 MemTableIterator 需要自引用? 因为 StorageIterator 迭代器可能需要多次获取当前 Key 而不移动到下一个元素,所以将移动操作和取值操作分离是比较好的选择。使用自引用结构的目的是避免编写 Rust 的生命周期代码的复杂性。

实现单个内存表迭代器之后,因为除当前内存表外还有多个冻结内存表,所以需要一个合并迭代器,将所有内存表迭代器合并,从而实现顺序遍历。如果有多个内存表包含相同的键,需要返回该键的最新值,以及跳过其他包含旧值的键。可以使用堆来对多个迭代器排序,首先按照键的大小排序,然后按照内存表的新旧编号排序。

1
2
3
iter1: b->del, c->4, d->5
iter2: a->1, b->2, c->3
iter3: e->4
1
a->1, b->del, c->4, d->5, e->4

Rust 的 BinaryHeap 在修改堆中元素之后,会自动排序保证堆的有效性。如果在修改之后迭代器报错,不能直接返回错误信息,而要首先从堆中移除失效的迭代器,从而避免访问失效的迭代器(会调用其方法获取 Key)。

1
2
3
let Some(mut inner_iter) = self.iters.peek_mut() {
inner_iter.next()?; // <- will cause problem
}

实现合并迭代器 MergeIterator 之后,外部还有一个 LsmIterator 做包装,用来过滤被逻辑删除的键,也就是在遍历时会自动跳过值为空的键。然后 LsmIterator 外部又有一个 FusedIterator 做包装,在底层迭代器报错之后,会避免再次访问底层迭代器。为什么不直接在 MemTableIterator 中过滤被逻辑删除的键? 在合并迭代器时需要知道该键在新内存表被删除避免读取到旧内存表的值。

Block

当内存表数量超过系统限制时,会将内存表作为 SST 刷新到磁盘,一个 SST 由多个块组成,单个块的大小通常是 4KB。编码方式如下:键值对是变长编码,作为 Entry 存储在 Data Section,同时维护每个 Entry 的偏移量来支持二分查找,最后存储总的 Entry 数量从而支持定位 Offset Section 的起始位置。

1
2
3
4
5
----------------------------------------------------------------------------------------------------
| Data Section | Offset Section | Extra |
----------------------------------------------------------------------------------------------------
| Entry #1 | Entry #2 | ... | Entry #N | Offset #1 | Offset #2 | ... | Offset #N | num_of_elements |
----------------------------------------------------------------------------------------------------
1
2
3
4
5
-----------------------------------------------------------------------
| Entry #1 | ... |
-----------------------------------------------------------------------
| key_len (2B) | key (keylen) | value_len (2B) | value (varlen) | ... |
-----------------------------------------------------------------------

Sorted String Table (SST)

SST 由存储在磁盘上的数据块和索引块组成,数据块是按需加载的,只有在用户请求时才会加载到内存中。索引块也可以按需加载,但在本项目中假设所有 SST 索引块(元数据块)都能装入内存。一个 SST 文件的大小通常是 256MB,在构建 SST 时最好预分配 Vec 的空间,避免频繁动态扩容的开销。 如果单个数据块的大小超出限制,会自动创建一个新块存储数据。SST 的元数据包括,每个数据块的第一个和最后一个键以及该数据块的偏移量。最后存储元数据的偏移量从而支持定位元数据的起始位置。

1
2
3
4
5
-------------------------------------------------------------------------------------------
| Block Section | Meta Section | Extra |
-------------------------------------------------------------------------------------------
| data block | ... | data block | metadata | meta block offset (u32) |
-------------------------------------------------------------------------------------------

在实现 SsTableIteratorseek_to_key 方法时(找到第一个 ? >= x 的键),可以根据数据块的第一个键使用二分查找来快速定位目标数据块。需要注意的是,如果目标 Key 是 x,要找的是最后一个满足 first_key <= x 的数据块,而且在调用该 BlockIteratorseek_to_key 方法之后,还要额外判断该 BlockIterator 是否失效。例如,查找 b 会定位到第一个数据块(正确),而查找 d 也会定位到第一个数据块(错误),然后在 BlockIterator 上调用 seek_to_key 会导致迭代器失效,此时需要跳到下一个块上,也就是说第二个数据块才是目标块。

1
2
3
4
5
--------------------------------------
| block 1 | block 2 | block meta |
--------------------------------------
| a, b, c | e, f, g | 1: a/c, 2: e/g |
--------------------------------------

可以使用缓存来加速数据块的访问,项目使用 moka-rs 作为缓存库来实现块缓存,块由 (sst_id, block_id) 唯一标识。

Read Path & Write Path

实现完成内存表和 SST 结构之后,可以实现 TwoMergeIterator 来合并两者的迭代器,理论上可以直接使用之前实现的 MergeIterator,不过这里只有两个迭代器需要合并,所以这里使用一个更简单的实现。然后可以修改 LsmStorageInnerscanget 方法的实现,将 SST 添加到读取路径中。由于创建 SST 的迭代器会比较耗时(涉及磁盘 I/O),所以不要在整个过程持有 state 读锁,而是创建一个 state 快照,然后基于快照创建迭代器来访问数据。

创建 SST 的合并迭代器需要加载所有底层 SST 的第一个块,优化方式是并行创建迭代器。在 LsmStorageInner::scan 方法中,实际上可以利用参数 lowerupper,以及 SST 的 first_keylast_key 来过滤 SST,避免不必要的磁盘 I/O。当内存表的数量达到上限时,需要将最旧的冻结内存表以 SST 文件的形式刷到磁盘,刷新工作主要由后台线程负责。

1
2
3
4
let snapshot = {
let guard = self.state.read();
Arc::clone(&guard)
};

Snack Time: SST Optimizations

可以在读取路径上集成布隆过滤器来过滤 SST,布隆过滤器基于 SST 包含的数据创建,每个 SST 都包含一个持久化的布隆过滤器数据。由于数据块中的键是有序存储的,可以对键进行前缀压缩。根据相邻键进行键前缀压缩而不是根据块中的第一个键进行压缩有哪些优点/缺点? 项目中是根据数据块的 first_key 的前缀进行压缩,这样可以在查找时快速复原当前 Key。如果是根据相邻键压缩,那么压缩比例会更高,但是压缩和复原最坏需要遍历所有前缀数据。

1
2
3
4
5
6
-----------------------------------------------------------------------------------------------------
| Block Section | Meta Section |
-----------------------------------------------------------------------------------------------------
| data block | ... | data block | metadata | meta block offset | bloom filter | bloom filter offset |
| | varlen | u32 | varlen | u32 |
-----------------------------------------------------------------------------------------------------
1
key_overlap_len (u16) | rest_key_len (u16) | key (rest_key_len)

Week 2 Overview: Compaction and Persistence

Compaction Implementation

可以对 L0 SST 文件进行压缩,避免读放大,由于各个 SST 之间的键范围存在重叠。最简单的方式是进行 Full Compaction,将所有键按顺序排序存储到一组新的 SST 中(被称为 sorted run)。压缩时允许并行执行读写操作,所以不持有 state 的读写锁,依然复制状态快照(不会复制 SST,只是复制状态),然后基于快照创建 L0 和 L1(之前压缩的 L1 需要和 L0 再次整合)的合并迭代器。

使用 SsTableBuilder 构建 SST,如果超出大小限制则拆分文件,在迭代时过滤已经被删除的键,重复的键已经在合并迭代器内部过滤。替换状态时只需要持有 state_lock 独占锁,state 读写锁依然只在复制状态快照时持有,允许并行执行读写内存表的操作。替换状态不会丢失内存表的修改,因为状态只包含内存表的指针。

由于在压缩时也会产生 L0 SST 文件,所以在替换状态时只删除被压缩的 L0 SST 文件、以及所有 L1 SST 文件。在 macOS/Linux 操作系统上,直接删除文件不会有问题,即使可能存在并行读取,因为操作系统只有在没有文件句柄被持有时才会真正删除该文件。在 Windows 上似乎会直接报错。 删除文件操作在解锁 state_lock 之后执行。

Simple Compaction Strategy

实现 Simple Leveled Compaction 策略,基本想法是指定一个最大层数 max_levels(不包括 L0),然后在 L0 SST 文件数量达到阈值,或者下层文件数量和上层(>= L1)文件数量的比值小于阈值时执行相邻层级之间的压缩。是否可以像 Full Compaction 一样直接过滤已经被删除的键? 只有当压缩 Level 是最底层时,才能过滤掉已经被删除的键,否则可能读取到下层 Level 的旧值。主动选择旧 Level 进行压缩,即使其没有达到指定阈值,是否是一个好主意? 参考 Lethe 文章,定期压缩可以减少无效键导致的空间放大、压缩无效键导致的写放大、无效键占用布隆过滤器导致误报率提升。

Tiered Compaction Strategy

实现 Tiered Compaction 策略(RocksDB 的 Universal Compaction),该压缩不会使用 L0,所有 SST 都直接存放到 levels 中,压缩过程中生成的 SST 总是会占据一个 tier,tier 的编号就是该 tier 包含的第一个 SST ID。

首先,只有当 tier 的数量超过阈值才会触发压缩。当估计的空间放大率 all levels except last level size / last level size 超过阈值会触发 Full Compaction,或者 this tier size / sum of all previous tiers size 超过阈值会触发前缀压缩。如果没有触发上述压缩,则会直接触发前缀压缩,压缩的最大层数由参数限制。由于压缩过程中也会生成 SST,所以修改状态时替换的是 levels 的中间部分而不一定是前缀。

Leveled Compaction Strategy

实现 Leveled Compaction 策略,相当于对 Simple Leveled Compaction 的优化。之前的策略会在相邻层级之间压缩数据,初始时会反复地将上层数据压缩到下面的空层中,更好的策略是跳过中间的空层,直接和有数据的低层进行压缩。在数据量不是很大的时候,没有必要分层压缩产生读放大,所以给最底层设置一个阈值 base_level_size_mb,在最底层的文件大小超过该阈值之前总是将数据压缩到最底层。

然后使用 level_size_multiplier 来确定各个层级的 target size,只允许有一个小于 base_level_size_mb 的正 target size,该层记作 base level。每次压缩 L0 的数据都直接压缩到 base level,这样可以确保在数据量比较小时使用较少的层级(后缀层级),避免较多层级产生读放大、以及反复压缩相邻空层产生写放大。

优先将满足 current_size / target_size > 1.0 的比值最大的层级压缩到下层中,尽量减少空间放大。之前的策略总是将上层所有 SST 和下层做压缩,这样压缩过程中的空间放大会比较多。可以实现部分压缩,从上层选择最旧的 SST,然后和下层键范围重叠的 SST 做压缩。对于 L0 可以计算整个 L0 的最小最大键,然后和下层范围重叠的 SST 做压缩。需要考虑没有重叠的情况,此时压缩结果需要放在 Level 的开头或者末尾,比较简单的实现方式就是根据 SST 的 first_key 排序。

Manifest

可以持久化 LSM 存储引擎的状态,从而允许重启时恢复。简单的方法是,在状态被修改时将所有状态持久化,但是这样磁盘 I/O 开销较大,特别是修改操作比较频繁时。该项目使用 Manifest 文件维护 SST Flush 和 Compaction 操作记录,SST Flush 操作记录该 SST 的 sst_id,Compaction 操作记录压缩任务 task 和压缩结果 output。在不持有 state 读写锁时追加 Manifest 文件,允许并行读写内存表。

1
| JSON record | JSON record | JSON record | JSON record |

由于记录的信息有限,在重启恢复时只能先构建 L0levels 元数据,然后根据这些元数据读取磁盘上的 SST 来构建 sstables 哈希表。如果在压缩过程中使用到 sstables 中 SST 的 first_key 排序,那么在恢复时需要特殊处理。不能在应用记录的过程中构建 sstables,因为记录中的 SST 可能已经被压缩,从而对应的文件已经从磁盘删除。

为保证磁盘中 SST 和 Manifest 文件的一致性,需要在追加 Manifest 文件之前 Sync 整个存储目录,或者可以在修改文件之后总是执行 Sync。 在关闭 LSM 之前,需要将所有内存表刷新到 SST 中。在恢复时需要维护最大的 max_sst_id,恢复完成之后需要根据 max_sst_id + 1 创建当前的内存表。由于 Manifest 文件会记录所有操作,所以需要定时创建快照来截断日志(例如 Raft 中状态机快照),来减少空间占用以及加快恢复速度。

Write-Ahead Log (WAL)

Manifest 文件可以保证正常关闭时,LSM 状态的持久性,但是如果发生崩溃,就需要依靠 WAL 来保证持久性。每个内存表对应一个 WAL 文件,如果冻结内存表被刷新到 SST 文件中,则可以将 WAL 文件删除,前提是该 SST 文件已经被 Sync 到磁盘中(默认在创建 SST 时就会 Sync)。

1
| key_len | key | value_len | value |

为了能够在重启时恢复内存表,需要在 Manifest 文件中维护 NewMemtable 记录,该记录存储内存表的 ID。为减少磁盘 I/O 以及系统调用开销,可以在将当前内存表转移到冻结内存表时刷新 WAL 到磁盘,而不是在每次修改内存表时刷新,不过如果宕机则会丢失当前内存表的数据。创建 WAL 文件应该在追加 Manifest 记录之前,这样可以保证恢复时的一致性。在重启恢复时,依然不能在应用记录的过程中读取 WAL,因为记录中的 WAL 可能已经被删除,由于该内存表已经被刷新到 SST 中。

Snack Time: Batch Write and Checksums

实现批量写入接口,允许用户提供一组 putdelete 操作批量执行。给 Block、SST Meta(Block Meta、Bloom Filter)、WAL 以及 Manifest 文件添加校验和,从而允许检查数据是否损坏。

1
2
3
4
5
6
---------------------------------------------------------------------------------------------------------------------------
| Block Section | Meta Section |
---------------------------------------------------------------------------------------------------------------------------
| data block | checksum | ... | data block | checksum | metadata | meta block offset | bloom filter | bloom filter offset |
| varlen | u32 | | varlen | u32 | varlen | u32 | varlen | u32 |
---------------------------------------------------------------------------------------------------------------------------
1
2
3
4
5
6
----------------------------------------------------------------------------------------------------------
| Meta Section |
----------------------------------------------------------------------------------------------------------
| no. of block | metadata | checksum | meta block offset | bloom filter | checksum | bloom filter offset |
| u32 | varlen | u32 | u32 | varlen | u32 | u32 |
----------------------------------------------------------------------------------------------------------
1
| key_len | key | value_len | value | checksum |
1
| len | JSON record | checksum | len | JSON record | checksum | len | JSON record | checksum |

Week 3 Overview: Multi-Version Concurrency Control

Timestamp Key Encoding + Refactor

Snapshot Read - Memtables and Timestamps

Snapshot Read - Engine Read Path and Transaction API

为实现 MVCC 需要重构代码,首先替换 Key 的表示以包含时间戳字段,按照 user_key 升序 ts 降序排序,这样可以优先读到新版本的数据。然后修改内存表、Block、SST、WAL 以支持时间戳字段,在重启恢复 LSM 状态时需要根据 SST 和内存表中使用的最大时间戳来确定当前时间戳。在整个 write_batch 中需要持有 MVCC 的 write_lock,从而确保按照时间戳递增的顺序执行写入操作(只有写入操作会递增时间戳)。

1
2
Alternative key representation: | user_key (varlen) | ts (8 bytes) | in a single slice
Our key representation: | user_key slice | ts (u64) |
1
key_overlap_len (u16) | remaining_key_len (u16) | key (remaining_key_len) | timestamp (u64)
1
| key_len (exclude ts len) (u16) | key | ts (u64) | value_len (u16) | value | checksum (u32) |

在压缩过程中,暂时忽略 compact_to_bottom_level 保留所有版本的数据(不执行实际的删除操作)。确保不同时间戳的相同 user_key 在同一个的 SST 中,即使超出 SST 的大小限制,从而简化其他功能的开发。个人认为主要是 Leveled Compaction 会从上层部分压缩到下层,如果多个版本不在一个 SST 中,会存在旧版本的键值对在上层而新版本的键值对被压缩到下层的情况。修改 LsmIterator 包含时间戳字段,只迭代 <= 当前时间戳的最新数据,如果键被删除就跳过该键的所有旧版本。应该首先判断版本再判断删除,因为删除操作可能是在快照之后的版本执行的。

get 读取路径中,只需要根据当前时间戳创建事务,读取快照内可见的数据即可。不过由于内存表会包含多个版本的数据,所以只能使用范围查询来查找该 user_key 的最新可见版本。在 scan 读取路径中,如果 lowerupperBound::Included,则对应的时间戳是 tsTS_RANGE_END。如果是 Bound::Excluded,则对应的时间戳是 TS_RANGE_ENDts | TS_RANGE_BEGIN。因为对于 lower 而言,必须定位到对应 user_key_begin 的最旧版本,才能确保迭代器跳过该 user_key_begin。对于 upper 而言,必须定位到 ts 及更新的版本才能确保迭代器跳过该 user_key_end

Watermark and Garbage Collection

Watermark 是维护系统中 lowest_read_ts 的结构,可以在创建/终止事务时,使用 BTreeMap 维护各个时间戳的引用计数来实现。然后压缩操作保留所有键的最新快照以及所有 >= lowest_read_ts 的快照,例外情况是如果当前压缩到 compact_to_bottom_level,且键的最新快照是删除操作,且该快照 <= lowest_read_ts,则不保留该键值对到新 SST 中。

Transaction and Optimistic Concurrency Control

事务内执行的修改操作都存储到事务的 local_storage 本地内存表中,该表不包含时间戳信息。读取操作会先读取本地内存表,再读取公共内存表及 SST 文件。本地内存表迭代器 TxnLocalIterator 和之前没有时间戳的公共内存表实现相同,而 TxnIterator 会使用 TwoMergeIterator 合并本地和公共数据,不过需要跳过被删除的键,由于之前只有 LsmIterator 会执行该逻辑。

提交事务会将当前事务的状态设置为已提交,然后将本地内存表的数据通过 LsmStorageInner::write_batch 批量提交到公共内存表中。如果在写入公共内存表的过程中崩溃,恢复时的 WAL 无法区分事务之间的边界,从而不能保证事务的原子性。可以实现批量 WAL,将事务内的写入操作批量提交给 WAL,然后添加 Header 和 Footer 来区分事务边界。而且要保证批量写入操作都存储到相同的内存表中,从而使得事务内的批量操作只存储在一个 WAL 文件中。

1
2
3
|   HEADER   |                          BODY                                      |  FOOTER  |
| u32 | u16 | var | u64 | u16 | var | ... | u32 |
| batch_size | key_len | key | ts | value_len | value | more key-value pairs ... | checksum |

(A Partial) Serializable Snapshot Isolation

在事务内执行读写操作时维护读写集,然后在提交时验证当前事务是否和已提交事务冲突(OCC),从而实现可串行化快照隔离。首先,需要确保验证、提交操作的原子性,所以在 commit 方法开始时持有 MVCC 的 commit_lock 锁。然后遍历在 (read_ts, expected_commit_ts) 范围内提交的事务,判断当前事务的读集是否和这些事务的写集重叠。如果重叠则说明当前事务和已提交事务冲突,直接返回错误信息。例外情况是,如果当前事务是只读的,则不需要验证冲突。基本想法是,写操作会基于读操作的结果执行,而只读则不会修改公共数据。

在本项目中,对于 scan 操作只将扫描到的键加入读集而不是扫描范围,这样实现简单但是无法真正保证可串行化快照隔离(依然存在 write skew 异常)。在读写集中存储的是键的哈希值,这样可以减少内存空间占用、加快验证速度,但是会存在哈希冲突导致误报。在提交事务时可以将提交时间戳小于 lowest_read_ts 的已提交事务数据移除,然后将当前事务数据加入已提交事务集合。

1
2
3
4
txn1: len(scan(..)) = 2
txn2: len(scan(..)) = 2
txn1: put key1 = 2, commit, read set = {a, b}, write set = {key1}
txn2: put key2 = 2, commit, read set = {a, b}, write set = {key2}

Snack Time: Compaction Filters

如果需要删除以某个字符串为前缀的所有键,正常调用删除方法不但不会减少空间占用,反而会为每个键新增一个删除版本,只有压缩到最底层才会真正删除。可以使用压缩过滤器,添加前缀匹配规则到压缩过滤器,然后在压缩过程中不保留匹配的键值对到 SST 中。不过保证正确的前提是,所有事务都不会读取相关的键。

Guava RateLimiter

参考 RateLimiter 代码。

基本使用

1
2
3
4
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
1
2
RateLimiter rateLimiter = RateLimiter.create(1);
rateLimiter.acquire();

代码实现

抽取 SmoothBursty 限流器的关键代码,梳理基本的实现流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class RateLimiter {

/**
* 最多可以存储多少秒的许可来应对突发流量
*/
private final double maxBurstSeconds;

/**
* 允许存储的最大许可数量
*/
private final double maxPermits;

/**
* 生成许可的间隔时间
*/
private final double stableIntervalMicros;

/**
* 当前存储的许可数量
*/
private double storedPermits;

/**
* 允许下一个请求获取许可的时间
*/
private long nextFreeTicketMicros = 0L;

/**
* 每秒生成的许可数量
*/
RateLimiter(double permitsPerSecond) {
this.maxBurstSeconds = 1.0;
maxPermits = maxBurstSeconds * permitsPerSecond;
stableIntervalMicros = TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
}

/**
* 获取指定数量的许可,阻塞请求知道可以获取,返回等待的时间(秒)
*/
public double acquire(int permits) {
// 参数范围检查
if (permits <= 0) {
throw new IllegalArgumentException(String.format("Requested permits (%s) must be positive", permits));
}
long microsToWait;
// 加锁保证线程安全
synchronized (this) {
Instant instant = Instant.now();
long nowMicros = instant.getEpochSecond() * 1_000_000 + instant.getNano() / 1000;
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
microsToWait = max(momentAvailable - nowMicros, 0);
}
LockSupport.parkNanos(microsToWait * 1000);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

/**
* 返回当前请求允许获取许可的时间,设置当前存储的许可数量,以及下一个请求允许获取许可的时间
*/
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 如果当前时间大于 nextFreeTicket,则重新生成当前时间存储的许可数量,以及当前请求允许获取许可的时间
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / stableIntervalMicros;
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
// 存储当前请求允许获取许可的时间作为返回值
long returnValue = nextFreeTicketMicros;
// 消耗已有的许可数量,然后根据需要新获取的许可数量,生成下一次请求的等待时间
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros = (long) (freshPermits * stableIntervalMicros);
// 将等待时间添加到 nextFreeTicketMicros 中,如果溢出则设置为 Long.MAX_VALUE
try {
nextFreeTicketMicros = Math.addExact(nextFreeTicketMicros, waitMicros);
} catch (ArithmeticException e) {
nextFreeTicketMicros = Long.MAX_VALUE;
}
storedPermits -= storedPermitsToSpend;
return returnValue;
}

public static void main(String[] args) {
RateLimiter rateLimiter = new RateLimiter(1);
for (int i = 0; i < 10; i++) {
double wait = rateLimiter.acquire(2);
System.out.printf("等待时间: %fs, 剩余许可: %f\n", wait, rateLimiter.storedPermits);
}
}
}
1
2
3
4
5
6
7
8
// 当前时间大于 nextFreeTicket,生成 maxPermits 个许可
// 请求 2 个许可,当前只有 1 个许可,等待 0 s,下一个请求的等待时间是 1 s
等待时间: 0.000000s, 剩余许可: 0.000000
// 请求 2 个许可,当前只有 0 个许可,等待 1 s,下一个请求的等待时间是 2 s
等待时间: 0.967138s, 剩余许可: 0.000000
// 请求 2 个许可,当前只有 0 个许可,等待 2 s,下一个请求的等待时间是 2 s
等待时间: 1.981604s, 剩余许可: 0.000000
等待时间: 1.989831s, 剩余许可: 0.000000