Lab 1: MapReduce

课程网站实验网站实验指南

准备工作:复习 Lecture 1 & 2 的内容,主要是 MapReduce 的论文、Go 教程和 RPC 包的使用。

交互逻辑

首先需要明白,Coordinator 和 Worker 的关系,以及 Map/Reduce Task 和 Map/Reduce 函数的关系。MapReduce 由一个 Coordinator 和多个 Worker 组成,Coordinator 负责将 Task 分配给空闲的 Worker,可以是 Map Task 或 Reduce Task。Map/Reduce Task 负责调用应用程序提供的 Map/Reduce 函数,对数据进行处理。

最开始我以为是由 Coordinator 管理所有 Worker,主动分配任务。结果代码的结构是由 Worker 向 Coordinator 发送 RPC 请求,Coordinator 被动的接收请求,然后根据当前的情况分配任务。仔细想想,确实应该这样,Worker 是可以随意增减的,它并不由 Coordinator 管理。

Coordinator 不仅是接收分配任务的请求,在一个 Map Task 完成时,它也会收到该 Task 生成的中间文件的信息。而且 Reduce Task 完成时,也应该通知 Coordinator,从而 Coordinator 可以确认所有是否已完成所有 Reduce Task。

数据结构

明白整体的交互逻辑之后,需要思考一下 Coordinator 存储什么数据,使用什么数据结构。根据论文 3.2 节的描述,需要存储每个 Map/Reduce Task 的状态(包括 idle、in-progress 和 completed),以及非 idle 状态的 Task 所在的 Worker 标识,还有 Map Task 生成的中间文件的位置和大小。在我们的实现中,就只需要 Map/Reduce Task 的状态和文件路径。当然,中间文件是根据哈希函数划分给 Reduce Task 的,那么存储时也应该能够知道,某个文件由哪个 Map Task 生成,又应该交给哪个 Reduce Task 处理。

而且论文提到,中间文件是以增量的方式发送给对应 Reduce Task 的,这也是为什么需要记录非 idle 状态的 Task 所在的 Worker 标识。但是,这样实现起来比较复杂,所以我们不会记录 Worker 标识,也不会去增量的发送 Reduce Task 所需的文件,而是等待所有 Map Task 完成之后,再分配 Reduce Task 给 Worker。

然后考虑并发访问数据结构的问题,我们可以先使用一个粗粒度的锁来实现 MapReduce,测试结果正确之后,再尝试使用更细粒度的锁,这样可以避免并发时发生非并发错误,从而难以调试。

更多细节

考虑一下分配 Task 的细节,分配的先后顺序已经确定是优先分配 Map Task,所有 Map Task 的状态为 completed 之后,再分配 Reduce Task。那么如何实现呢?首先,我们会分配状态为 idle 的 Map Task 给 Worker,如果没有则等待所有 Map Task 完成。如何等待,根据实验网站的提示,我们可以使用 time.Sleep() 或者 sync.Cond。那么之后呢,等待之后该做什么?如果依然没有完成,我们应该继续等待吗?显然不行,如果某个 Map Worker 崩溃,也就是该实验中预定的超时 10 秒钟,此时如果剩余的 Worker 都在等待,那么 Task 将永远都无法推进。所以,我们应该重新考虑是否需要分配 Map Task,判断 Reduce Task 是否全部完成也是类似的逻辑。

根据上面的讨论,我们又可以问,如何确认 Task 是否完成,是使用 for 循环遍历吗?如果是,那么当 Map Task 全部完成时,如何避免重复的遍历?使用一个标志位或许可以解决,那么应该在哪里遍历呢,似乎可以有两种策略,一种是在分配 Reduce Task 之前遍历,另一种是在 Worker 的 Map Task 完成时遍历。如果不使用 for 循环,能否使用计数器记录 Map Task 完成的数量?如果判断为“崩溃”的 Map Worker 其实没有崩溃,是否会导致额外的计数?这都取决于具体的实现方式。

考虑到代码的可读性,如何表示 Task 的状态和类型?我首先想到的是使用枚举类,但是 Go 中实现枚举的方式比较特别,参考Go 实现枚举。以上基本上是讨论 Coordinator 如何处理 Worker 的请求,那么 Worker 的实现其实是比较简单的,不需要考虑太多。只要根据请求的返回数据,判断是执行 Map Task 还是 Reduce Task 即可。当然,根据实验网站的提示,我们可以添加一个 pseudo-task 来表示所有 Task 已完成,从而使 Worker 退出请求 Task 的循环。

实验小结

准备工作很重要,很久之前看的 MapReduce 论文,对各种概念的定义不是很清楚,直接看代码就很懵。然后,我个人的经验就是不要提前优化,以及先使用粗粒度锁排除非并发问题,然后再使用更细粒度的锁。测试时只遇到两个小问题,一个是导出字段没有使用大写字母开头,另一个是中间文件名的保存位置搞错。根本就没遇到并发问题啊,有点出乎意料。代码编写时我感觉脑子有点乱,就是会想很多问题,一定记住先实现基本的逻辑,再处理额外的逻辑。

简单看下 Tai-e 源码

官方文档代码仓库ISSTA 2023 论文静态程序分析框架“太阿”的设计之道

已完成全部作业,简单看下 Tai-e 科研版是如何实现作业的,以及作业涉及的部分接口和类的源码。

在分析的实现中,Stream 使用的比较多,然后就是某些比较好用的 API 在作业中没有注意到,代码中也有使用高版本 Java 的某些新特性。基本上每个包都有 package-info.java 文件,描述当前包的关键信息,还是很不错的。我没怎么看和作业关系不大的源码,它涉及到的东西比较多,需要额外的理论知识。

IR

FieldRef

特点:私有构造函数和静态工厂方法;Record Classes;缓存实例对象;向方法传递 Runnable 类型的对象来实现回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class FieldRef extends MemberRef {

private static final ConcurrentMap<Key, FieldRef> map =
Maps.newConcurrentMap(4096);

static {
World.registerResetCallback(map::clear);
}

public static FieldRef get(
JClass declaringClass, String name, Type type, boolean isStatic) {
Key key = new Key(declaringClass, name, type);
return map.computeIfAbsent(key, k -> new FieldRef(k, isStatic));
}

private FieldRef(Key key, boolean isStatic) {
...
}

private record Key(JClass declaringClass, String name, Type type) {
}
}

Var

特点:使用内部类存储和当前 Var 相关的语句,而不是直接包含在当前类中;使用 transientwriteObjectreadObject 显示控制序列化,避免反序列化创建多个单例对象;使用 Collections.unmodifiableList 方法返回不可修改的列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Var implements LValue, RValue, Indexable {

private transient RelevantStmts relevantStmts = RelevantStmts.EMPTY;

@Serial
private void writeObject(ObjectOutputStream s) throws IOException {
s.defaultWriteObject();
if (relevantStmts == RelevantStmts.EMPTY) {
s.writeObject(null);
} else {
s.writeObject(relevantStmts);
}
}

@Serial
private void readObject(ObjectInputStream s) throws IOException,
ClassNotFoundException {
s.defaultReadObject();
relevantStmts = (RelevantStmts) s.readObject();
if (relevantStmts == null) {
relevantStmts = RelevantStmts.EMPTY;
}
}

private static class RelevantStmts implements Serializable {

private static final RelevantStmts EMPTY = new RelevantStmts();

private static <T> List<T> unmodifiable(List<T> list) {
return list.isEmpty() ? list : Collections.unmodifiableList(list);
}
}
}

Stmt

特点:Stmt 包含以 StmtVisitor 类型作为参数的 accept 泛型方法,可以用于实现访问者模式

1
2
3
4
public interface Stmt extends Indexable, Serializable {

<T> T accept(StmtVisitor<T> visitor);
}
1
2
3
4
5
6
7
8
9
10
11
public interface StmtVisitor<T> {

default T visit(New stmt) {
return visitDefault(stmt);
}

default T visitDefault(Stmt stmt) {
return null;
}
}

DefinitionStmt

特点:使用 <L extends LValue, R extends RValue> 泛型,对表达式进行限定,所有具体的子类都会使用具体的类型替换限定的类型。

1
2
DefinitionStmt<L extends LValue, R extends RValue>
Invoke extends DefinitionStmt<Var, InvokeExp>

Analysis

特点:LiveVariable 的实现,删除变量使用 ifPresent + lambda 处理,代码比较简洁。ConstantPropagationnewBoundaryFact 使用 Stream 来实现,而我是使用普通的方式实现的。Evaluator 类中的 evaluate 方法,使用了扩展的 Switch Expressions,可以省略 breakInterConstantPropagation 实现 Alias-Aware 时,使用访问者模式处理 Load 和 Store 语句。指针分析和作业时区别比较大,没有细看。

核心组成