Threads and RPC

参考 Go FAQcrawler.gokv.gonote

Exercise: Web Crawler

串行爬虫

就是 DFS。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
if fetched[url] {
return
}
fetched[url] = true
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
for _, u := range urls {
Serial(u, fetcher, fetched)
}
return
}

并行爬虫

共享 + 锁

在 DFS 的基础上,同时启动多个线程来爬取网页,通过使用锁保护共享变量,使用计数器等待所有线程执行完成。如果网页很多,可能会创建非常多的线程,可以通过使用线程池限制线程的数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
type fetchState struct {
mu sync.Mutex
fetched map[string]bool
}

func (fs *fetchState) testAndSet(url string) bool {
fs.mu.Lock()
defer fs.mu.Unlock()
r := fs.fetched[url]
fs.fetched[url] = true
return r
}

func ConcurrentMutex(url string, fetcher Fetcher, fs *fetchState) {
if fs.testAndSet(url) {
return
}
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
var done sync.WaitGroup
for _, u := range urls {
done.Add(1)
go func(u string) {
defer done.Done()
ConcurrentMutex(u, fetcher, fs)
}(u)
}
done.Wait()
return
}

func makeState() *fetchState {
return &fetchState{fetched: make(map[string]bool)}
}

通道(channel)

使用 channel 实现同步,不需要 DFS,没有任何共享变量,自然也不需要使用锁(尽管 channel 内部会使用锁)。代码的组织方式有点像 MapReduce。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func worker(url string, ch chan []string, fetcher Fetcher) {
urls, err := fetcher.Fetch(url)
if err != nil {
ch <- []string{}
} else {
ch <- urls
}
}

func coordinator(ch chan []string, fetcher Fetcher) {
n := 1
fetched := make(map[string]bool)
for urls := range ch {
for _, u := range urls {
if fetched[u] == false {
fetched[u] = true
n += 1
go worker(u, ch, fetcher)
}
}
n -= 1
if n == 0 {
break
}
}
}

func ConcurrentChannel(url string, fetcher Fetcher) {
ch := make(chan []string)
go func() {
ch <- []string{url}
}()
coordinator(ch, fetcher)
}

Go RPC

远程过程调用(Remote procedure call,RPC),指程序调用的过程在远程计算机执行。通常表现为客户端向服务器发送请求,服务器向客户端返回响应。数据在通过网络传输时需要进行序列化和反序列化,序列化是将数据转换为可以存储或传输的格式的过程,反序列化是序列化的逆过程。远程过程调用和本地过程调用的区别在于,如何处理失败的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
//
// Client
//

func connect() *rpc.Client {
client, err := rpc.Dial("tcp", ":1234")
if err != nil {
log.Fatal("dialing:", err)
}
return client
}

func get(key string) string {
client := connect()
args := GetArgs{"subject"}
reply := GetReply{}
err := client.Call("KV.Get", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
return reply.Value
}

func put(key string, val string) {
client := connect()
args := PutArgs{"subject", "6.824"}
reply := PutReply{}
err := client.Call("KV.Put", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
}

//
// Server
//

type KV struct {
mu sync.Mutex
data map[string]string
}

func server() {
kv := &KV{data: map[string]string{}}
rpcs := rpc.NewServer()
rpcs.Register(kv)
l, e := net.Listen("tcp", ":1234")
if e != nil {
log.Fatal("listen error:", e)
}
go func() {
for {
conn, err := l.Accept()
if err == nil {
go rpcs.ServeConn(conn)
} else {
break
}
}
l.Close()
}()
}

func (kv *KV) Get(args *GetArgs, reply *GetReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()

reply.Value = kv.data[args.Key]

return nil
}

func (kv *KV) Put(args *PutArgs, reply *PutReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()

kv.data[args.Key] = args.Value

return nil
}

其他问题

Q:为什么闭包函数可以使用外部函数的变量?

A:闭包函数引用的外部变量存储在堆中,所以当外部函数返回时,变量依旧存在,垃圾收集器会根据引用计数判断是否回收该变量。注意,如果需要引用循环中的变量,需要通过参数传递(值传递),因为外部变量会随循环变化。

Q:假设有两个线程,线程 A 使用条件变量等待某个条件,线程 B 在达到条件时通知线程 A。如果线程 B 在解锁之前通知线程 A 会有问题吗?

A:如果在解锁之前唤醒线程 A,假设线程 A 在 B 解锁之前被调度,那么线程 A 在获取锁时会被阻塞。之后的某个时刻,操作系统会调度线程 B 解锁,此时线程 A 可以重新被调度。注意,条件变量和锁的阻塞队列是独立的,所以不需要再次通知线程 A。

第 379 场力扣周赛

移除后集合的最多元素数

题目

输入长度为偶数 \(n\) 的数组 \(a\) 和 \(b\),输出从 \(a\) 和 \(b\) 中分别选择一半元素构成的集合的最大大小。

数据范围:\(1\leq n\leq 2\times 10^{4}\)。

思路

要使集合尽可能大,肯定优先选择除 \(a\) 和 \(b\) 交集以外的元素,假设分别为 \(x\) 和 \(y\),则可以选择 \(s=\min(x,\frac{n}{2})+\min(y,\frac{n}{2})\) 个不同元素。此时还有 \(n-s\) 个元素待选,假设交集的大小为 \(z\),则答案为 \(s+\min(n-s,z)\)。

执行操作后的最大分割数量

题目

输入长度为 \(n\) 的字符串 \(s\) 和一个整数 \(k\),输出至多改变一个字符时,执行操作能够得到的最大分割数。每次操作可以分割 \(s\) 的最多包含 \(k\) 个不同字符的最长前缀。

数据范围:\(1\leq n\leq 10^{4}\),\(1\leq k\leq 26\)。

思路

首先,很容易想到暴力做法,枚举每个位置的所有改变情况,然后通过遍历求分割数,时间复杂度为 \(O(n^{2}|\Sigma|\log{|\Sigma|})\)。显然,可以优化的部分就是最后遍历求分割数的 \(O(n\log{|\Sigma|})\)。然后观察修改字符相比不修改字符会产生什么变化,可以发现修改字符所在的分割段的长度可能发生变化,而前缀的分割数是固定的。可以想到预处理原字符串每个位置 \(i\) 的后缀分割数,问题就剩下如何快速求得修改字符所在段的右端点。由于字符数随着长度的增加而增加,所以可以通过二分求得该段的右端点,这还需要花费 \(O(n|\Sigma|)\) 的时间提前预处理出字符数的前缀和。最后,分割数为前缀 + 中间 + 后缀的段数。代码实现时,还有很多其他细节需要注意,强烈建议自己实现一下。

Hello 2024

Grouping Increases

题目

输入长度为 \(n\) 的数组 \(a\),将数组 \(a\) 分割为两个子序列(可能为空),输出两个子序列中满足 \(b_{i}<b_{i+1}\) 的下标 \(i\) 的数量之和的最小值。

数据范围:\(1\leq n\leq 2\times 10^{5}\),\(1\leq a_{i}\leq n\)。

思路

贪心。假设将数组 \(a\) 分割为数组 \(b\) 和 \(c\),从空数组开始,将 \(a\) 中的元素添加到 \(b\) 或 \(c\)。假设 \(b\) 和 \(c\) 的最后一个元素分别为 \(x\) 和 \(y\)(\(x\leq y\)),如果 \(a_{i}\leq x\) 或 \(a_{i}>y\),则将 \(a_{i}\) 添加到 \(b\),否则添加到 \(c\)。