参考 Go FAQ ,crawler.go ,kv.go ,note 。
串行爬虫
就是 DFS。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func Serial (url string , fetcher Fetcher, fetched map [string ]bool ) { if fetched[url] { return } fetched[url] = true urls, err := fetcher.Fetch(url) if err != nil { return } for _, u := range urls { Serial(u, fetcher, fetched) } return }
并行爬虫
共享 + 锁
在 DFS 的基础上,同时启动多个线程来爬取网页,通过使用锁保护共享变量,使用计数器等待所有线程执行完成。如果网页很多,可能会创建非常多的线程,可以通过使用线程池限制线程的数量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 type fetchState struct { mu sync.Mutex fetched map [string ]bool } func (fs *fetchState) testAndSet(url string ) bool { fs.mu.Lock() defer fs.mu.Unlock() r := fs.fetched[url] fs.fetched[url] = true return r } func ConcurrentMutex (url string , fetcher Fetcher, fs *fetchState) { if fs.testAndSet(url) { return } urls, err := fetcher.Fetch(url) if err != nil { return } var done sync.WaitGroup for _, u := range urls { done.Add(1 ) go func (u string ) { defer done.Done() ConcurrentMutex(u, fetcher, fs) }(u) } done.Wait() return } func makeState () *fetchState { return &fetchState{fetched: make (map [string ]bool )} }
通道(channel)
使用 channel 实现同步,不需要 DFS,没有任何共享变量,自然也不需要使用锁(尽管 channel 内部会使用锁)。代码的组织方式有点像 MapReduce。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func worker (url string , ch chan []string , fetcher Fetcher) { urls, err := fetcher.Fetch(url) if err != nil { ch <- []string {} } else { ch <- urls } } func coordinator (ch chan []string , fetcher Fetcher) { n := 1 fetched := make (map [string ]bool ) for urls := range ch { for _, u := range urls { if fetched[u] == false { fetched[u] = true n += 1 go worker(u, ch, fetcher) } } n -= 1 if n == 0 { break } } } func ConcurrentChannel (url string , fetcher Fetcher) { ch := make (chan []string ) go func () { ch <- []string {url} }() coordinator(ch, fetcher) }
远程过程调用(Remote procedure call,RPC),指程序调用的过程在远程计算机执行。通常表现为客户端向服务器发送请求,服务器向客户端返回响应。数据在通过网络传输时需要进行序列化和反序列化,序列化是将数据转换为可以存储或传输的格式的过程,反序列化是序列化的逆过程。远程过程调用和本地过程调用的区别在于,如何处理失败的情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 func connect () *rpc.Client { client, err := rpc.Dial("tcp" , ":1234" ) if err != nil { log.Fatal("dialing:" , err) } return client } func get (key string ) string { client := connect() args := GetArgs{"subject" } reply := GetReply{} err := client.Call("KV.Get" , &args, &reply) if err != nil { log.Fatal("error:" , err) } client.Close() return reply.Value } func put (key string , val string ) { client := connect() args := PutArgs{"subject" , "6.824" } reply := PutReply{} err := client.Call("KV.Put" , &args, &reply) if err != nil { log.Fatal("error:" , err) } client.Close() } type KV struct { mu sync.Mutex data map [string ]string } func server () { kv := &KV{data: map [string ]string {}} rpcs := rpc.NewServer() rpcs.Register(kv) l, e := net.Listen("tcp" , ":1234" ) if e != nil { log.Fatal("listen error:" , e) } go func () { for { conn, err := l.Accept() if err == nil { go rpcs.ServeConn(conn) } else { break } } l.Close() }() } func (kv *KV) Get(args *GetArgs, reply *GetReply) error { kv.mu.Lock() defer kv.mu.Unlock() reply.Value = kv.data[args.Key] return nil } func (kv *KV) Put(args *PutArgs, reply *PutReply) error { kv.mu.Lock() defer kv.mu.Unlock() kv.data[args.Key] = args.Value return nil }
其他问题
Q:为什么闭包函数可以使用外部函数的变量?
A:闭包函数引用的外部变量存储在堆中,所以当外部函数返回时,变量依旧存在,垃圾收集器会根据引用计数判断是否回收该变量。注意,如果需要引用循环中的变量,需要通过参数传递(值传递),因为外部变量会随循环变化。
Q:假设有两个线程,线程 A 使用条件变量等待某个条件,线程 B 在达到条件时通知线程 A。如果线程 B 在解锁之前通知线程 A 会有问题吗?
A:如果在解锁之前唤醒线程 A,假设线程 A 在 B 解锁之前被调度,那么线程 A 在获取锁时会被阻塞。之后的某个时刻,操作系统会调度线程 B 解锁,此时线程 A 可以重新被调度。注意,条件变量和锁的阻塞队列是独立的,所以不需要再次通知线程 A。