笔记01 - Lab 1: MapReduce

综述

什么是MapReduce:MapReduce是一种编程模型,用于大规模数据集的并行运算。软件实现指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对;指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。
MapReduce的应用情况:master的schedule调度函数负责根据任务数分配任务给worker,并同步worker的工作。worker启动时指定Map函数和Reduce函数,接收master调遣时根据任务标志来决定执行Map函数还是Reduce函数。在不同的应用场景下,用户对Map函数和Reduce函数进行自定义,并在启动worker时进行注册即可。

例子1:单词统计程序中:Map任务将读取文件内容,调用Map函数将文件内容映射成一组新的键值对,然后将映射结果输出到对应的Reduce中间文件中。其中,Map(映射)函数将文件内容映射成一组新的键值对,键为单词,值为出现次数。
Reduce任务读取对应的Reduce中间文件中,保证所有相同“单词:次数”键值对共享相同的键组,调用Reduce(归约)函数统计该单词出现的总次数,最后将结果写入到中间文件。

例子2:反向索引生成程序中:Map任务将读取文件内容,调用Map函数将文件内容映射成一组新的键值对,然后将映射结果输出到对应的Reduce中间文件中。其中,Map(映射)函数将文件内容映射成一组新的键值对,键为单词,值为出现文件名。
Reduce任务读取对应的Reduce中间文件中,保证所有相同“单词:文件名”键值对共享相同的键组,调用Reduce(归约)函数统计该单词出现的次数和所有文件名,最后将结果写入到中间文件。

实验内容

准备

1、Go version 1.7
2、参考资料:
Pro Git bookgit user’s manualGit for Computer Scientists
3、项目地址:git://g.csail.mit.edu/6.824-golabs-2017

1
2
3
4
$ git clone git://g.csail.mit.edu/6.824-golabs-2017 6.824
$ cd 6.824
$ ls
Makefile src

实验准备了顺序和分布式两种Map/Reduce模式。顺序模式启动第一个map任务,然后是第二个、第三个…所有map任务完成后启动reduce任务,该方法速度慢,但有利于调试。分布式模式在启动第一个map任务时就并发启动多个worker线程,然后reduce任务,该方法速度快,但不利于实现和调试。

Preamble: Getting familiar with the source

mapreduce提供了简单的Map/Reduce库。master.go的Distributed()函数和Sequential()函数分别提供了分布式模式和顺序模式用于启动job。job包括了以下内容:
1、提供许多输入文件、一个map函数、一个reduce函数、reduce任务的数目nReduce。
2、master将启动rpc服务(master_rpc.go),等待worker的注册(Register() in master.go)。当map task和reduce task可分配时,master分配这些task到对应的worker(schedule() in schedule.go)。
3、master将每个输入文件当成一个map task,启动nMap个map task/worker,每个worker调用doMap()将输入文件的key/value pairs映射到nReduce个中间文件中,映射方法是对每一个pair的key进行哈希。map阶段将产生nMap x nReduce个中间文件,每个worker机器在本地存储nReduce个中间文件。在本实验中,文件存于本机,不同worker都是运行在本机上;但在实际中,每个worker应当被允许读取其他worker写入的文件,可以通过GFS在不同机器上运行worker,并使用分布式存储系统进行存储。
4、master启动reduce阶段(doReduce() in common_reduce.go)。每个map task/worker有nReduce个中间文件,对于reduce task R,R将会收集每个map task/worker的第R个中间文件。reduse阶段将生成nReduce个文件。
5、master启动merge阶段(mr.merge() in master_splitmerge.go),将nReduce个文件合并为一个文件。
6、master为每一个worker发送Shutdown RPC,然后关闭rpc服务。

Part I: Map/Reduce input and output

完成以下内容:
1、为每一个map task切分输出文件,实现doMap() in common_map.go
2、为每一个reduce task收集输入文件,实现doReduce() in common_reduce.go

思路:
1、doMap函数负责一个map task,这里对应读入一个文件内容。然后,调用map function将文件内容处理为KeyValue数组,遍历每一个KeyValue,对Key进行哈希,并输出到对应的reduce中间文件。doMap产生nReduce个中间文件。
2、doReduce函数负责处理所有map task产生的第r个reduce中间文件。读取每一个中间文件,将KeyValue映射到结果集中,相同key的value存于同一个数组中。接着,对key进行排序。最后,针对结果集的每一项key->values[],调用reduce function将values[]连接为字符串new value,并将key->new value写入到reduce task的目标文件中,该文件按key字段排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func doMap(
jobName string, // the name of the MapReduce job
mapTaskNumber int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(file string, contents string) []KeyValue,
) {
//now we create file
files := make([]*os.File,nReduce)
encs := make([]*json.Encoder,nReduce)
var err error
for i:=0;i<nReduce;i++{
files[i], err = os.Create((reduceName(jobName,mapTaskNumber,i)))
defer files[i].Close()
if err != nil {
panic(err.Error())
}
encs[i] = json.NewEncoder(files[i])
}

//now we read file
data, err := ioutil.ReadFile(inFile)
if err != nil {
panic(err.Error());
return
}
//now we write file
kvs := mapF(inFile,string(data))
for _,kv:=range kvs{
err = encs[ihash(kv.Key)%nReduce].Encode(&kv)
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTaskNumber int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
//now we create file
outF,err := os.Create(outFile)
defer outF.Close()
if err != nil {
panic(err.Error())
}
enc := json.NewEncoder(outF)

//now we read file
kvMap := make(map[string][]string)
for i:=0;i<nMap;i++{
file, err := os.Open((reduceName(jobName,i,reduceTaskNumber)))
defer file.Close()
if err != nil {
panic(err.Error())
}

dec := json.NewDecoder(file)
//now we read file
var kv KeyValue
for{
err = dec.Decode(&kv)
if err != nil {
break;// done with this file
}
kvMap[kv.Key] = append(kvMap[kv.Key],kv.Value)
}
}

//now we sort the immediate result
var keys []string
for k,_:=range kvMap{
keys = append(keys,k)
}
sort.Strings(keys)

//now we write file
for _,key:=range keys{
enc.Encode(KeyValue{key, reduceF(jobName,kvMap[key])})
}
}

测试:

1
2
3
4
$ export "GOPATH=$PWD" 
$ cd "$GOPATH/src/mapreduce"
$ go test -run Sequential
ok mapreduce 2.694s

输出调试结果:

1
2
common.go加入debugEnabled = true
使用$ go test -v -run Sequential

Part II: Single-worker word count

任务:
完成单词计数功能,为main/wc.go提供map function和reduce function。map function主要根据输入的文件内容对单词进行提取和计数,reduce function则是对相同key的value次数进行统计。

思路:
1、单词的提取使用strings.FieldsFunc函数,单词的定义是连续的letter字符串,提取如下:

1
2
3
ss := strings.FieldsFunc(value, func(c rune) bool {
return !unicode.IsLetter(c)
})

2、注意,如果参数输入为go run wc.go master sequential pg-*.txt,golang接收为os.Args[3:],可以自动识别匹配的文件个数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func mapF(filename string, contents string) []mapreduce.KeyValue {
// TODO: you have to write this function
kvs := make([]mapreduce.KeyValue,0)
values := strings.FieldsFunc(contents,func(c rune)bool{
return !unicode.IsLetter(c)
})
for _,v:=range values{
kvs = append(kvs,mapreduce.KeyValue{Key:v,Value:"1"})
}
return kvs
}

func reduceF(key string, values []string) string {
// TODO: you also have to write this function
return strconv.Itoa(len(values))
}

测试:

1
2
3
4
$ cd "$GOPATH/src/main"
$ time go run wc.go master sequential pg-*.txt
$ sort -n -k2 mrtmp.wcseq | tail -10
$ rm mrtmp.*


1
$ bash ./test-wc.sh

Part III: Distributing MapReduce tasks

mapreduce的一大卖点正是它可以将顺序模式代码并行化,而且开发者无需做任何改变。此部分内容将使用RPC来模拟分布式计算(没能够真正地在多台机器上机器上进行Map/Reduce部署)。

master设计思路:
1、暴露mapreduce.Distributed接口。根据参数构造master数据结构,包括rpc地址、done channel、互斥锁、条件变量、文件名数组、net.Listener、rpc shutdown channel等。
2、启动rpc服务。先创建rpc服务,然后注册master结构。接着,master使用net.Listen进行监听,并使用net.Listener.Accept()接收连接,最后使用Server.ServeConn(conn)处理连接。当worker调用master暴露的rpc接口时将产生连接请求。

1
2
3
4
5
rpcs := rpc.NewServer()
rpcs.Register(mr)

conn, err := mr.l.Accept()
rpcs.ServeConn(conn)

其中 Server.Register 用于注册RPC服务,默认的名字是对象的类型名字(这里是Echo)。如果需要指定特殊的名字,可以用 Server.RegisterName 进行注册。被注册对象的类型所有满足以下规则的方法会被导出到RPC服务接口:
func (t *T) MethodName(argType T1, replyType *T2) error。被注册对应至少要有一个方法满足这个特征,否则可能会注册失败。
master导出的服务接口包括func (mr *Master) Register(args *RegisterArgs, _ *struct{}) error(用于worker注册)、func (mr *Master) Shutdown(_, _ *struct{}) error(用于停止自身rpc服务)。
3、调用schedule启动map阶段。在此之前,使用string channel等待worker的注册(sync.Cond.Wait()),一旦注册了worker(sync.Cond.Broadcast()),将其rpc地址写入到string channel中。schedule函数参数包括该string channel,意味着只会使用已注册的worker。当然,如果在启动rpc服务之后、调用schedule之前已经有worker注册了,master将会依次将已注册的worker的rpc地址写入到string channel中,而schedule函数将从该channel中读取。
需要注意的是条件变量的用法,不同于pthread条件变量的初始化(条件变量和搭配使用的互斥锁),golang的初始化只使用了一个参数,sync.NewCond(master)用于初始化master的cond,参数是master结构体,master结构体需包含一个互斥锁和一个条件变量
map阶段将根据输入文件数目确定map task数,然后分配给worker。每个worker负责一个或多个map task,将对应的文件内容哈希到nReduce个文件中。
4、调用schedule启动reduce阶段。该阶段继续使用map阶段的worker和新注册的worker。
reduce阶段将根据nReduce确定reduce task数,然后分配给worker。每个worker读取负责map的worker所保存的第r个文件,将其按照key有序规律输出到统计文件中。
5、关闭与workers的连接,关闭rpc服务。
6、启动merge阶段。按照key有序规律将nReduce个统计文件输出到最终文件中。
7、结束计算。

worker设计思路:
1、暴露mapreduce.RunWorker接口。根据参数构造worker数据结构,包括map function、reduce function、可被rpc调用的次数、rpc地址等。
2、worker向master注册。
3、启动rpc服务。先创建rpc服务,然后注册worker结构。接着,worker使用net.Listen进行监听,并使用net.Listener.Accept()接收连接,最后使用Server.ServeConn(conn)处理连接。当master调用worker暴露的rpc接口时将产生连接请求。RunWorker的最后一个参数表示该worker可被调用的rpc次数,一旦超过次数,worker将关闭监听。
worker暴露的rpc接口有func (wk *Worker) DoTask(arg *DoTaskArgs, _ *struct{}) error(用于master分配工作)、func (wk *Worker) Shutdown(_ *struct{}, res *ShutdownReply) error(用于工作完成时被master调用)。
4、worker结构体需包含一个互斥锁。
5、master可以分配一个或多个task给一个work,但必须等待上一个task完成才能分配下一个task。

schedule设计思路:
1、计算任务数。
2、对每一个任务创建一个goroutine。在每一个goroutine中,通过registerChan阻塞等待可用worker,master将会依次将已注册的worker的rpc地址写入到该channel中,完成任务之后对应的worker也会尝试写入到该channel中。channel的设计满足了“必须等待上一个task完成才能分配下一个task”的需求。
3、使用sync.WaitGroup进行任务同步。master必须等待所有worker完成map阶段之后才进行reduce阶段。
4、对registerChan的写入必须使用goroutine完成,否则map阶段当最后一个worker完成任务时,将会阻塞写入registerChan,而此时没有对registerChan进行读出,map阶段将无法正常完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}

fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

var wg sync.WaitGroup
for task:=0;task<ntasks;task++{
wg.Add(1)

go func(taskNum int){
//wait until we have available worker
worker := <- registerChan

var arg DoTaskArgs
arg.JobName = jobName
arg.Phase = phase
arg.File = mapFiles[taskNum]
arg.TaskNumber = taskNum
arg.NumOtherPhase = n_other
ok := call(worker, "Worker.DoTask", &arg, nil)
if ok == false {
fmt.Printf("Cleanup: RPC %s error\n", worker)
}
//remember to use goruntine to write data into registerChan
//because last time we write data into it, it will block until someone read it
//however, no one read from it, so the last wg.Done() will not been executed
//and map-phase never finish
go func(){
//again, worker is free
registerChan <- worker
}()
wg.Done()
}(task)
}
wg.Wait()

fmt.Printf("Schedule: %v phase done\n", phase)
}

测试:

1
2
$ cd "$GOPATH/src/mapreduce"
$ go test -run TestBasic

Part IV: Handling worker failures

本部分是需要处理worker出错的现象。MapReduce的worker没有持久状态,一旦worker出错,master发给该worker的rpc包将失败,因此,一旦发现这种现象,master需要将分配给该worker的任务重新分配给其他worker。
RPC失败不一定表示worker不在执行任务,有可能worker已经执行完任务了但是回复丢失了,或者worker正在执行任务但是master的RPC超时。因此,两个worker可能接收相同的task,并进行计算和生成输出。对一个给定的输入,对map和reduce function的两次调用将生成相同的输出,因此不会出现不一致性,后续程序不会读取到不同的输出。另外,MapReduce框架确保map和reduce function的输出是原子的:要么没有,要么是map和reduce function一次调用的完全输出(本实验没有实现这个功能,而是简单地停止worker,所以不会出现任务的并发执行问题)。
思路:
1、worker的fail实现是通过设置可被调用的rpc次数来实现的。
2、一旦worker关闭监听,master调用call将失败,尝试选取其他worker。因此,对于每一个任务的goroutine,借助for无限循环模拟worker失败并重新选取worker执行任务的操作,一旦调用成功则跳出循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}

fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

var wg sync.WaitGroup
for task:=0;task<ntasks;task++{
wg.Add(1)

go func(taskNum int){
for{ //we use this, cause worker may fail
//wait until we have available worker
worker := <- registerChan

var arg DoTaskArgs
arg.JobName = jobName
arg.Phase = phase
arg.File = mapFiles[taskNum]
arg.TaskNumber = taskNum
arg.NumOtherPhase = n_other
ok := call(worker, "Worker.DoTask", &arg, nil)
if ok == false {
fmt.Printf("Cleanup: RPC %s error\n", worker)
}else{
//remember to use goruntine to write data into registerChan
//because last time we write data into it, it will block until someone read it
//however, no one read from it, so the last wg.Done() will not been executed
//and map-phase never finish
go func(){
//again, worker is free
registerChan <- worker
}()
wg.Done()
break
}
}
}(task)
}
wg.Wait()

fmt.Printf("Schedule: %v phase done\n", phase)
}

测试:

1
2
$ cd "$GOPATH/src/mapreduce"
$ go test -run Failure

Part V: Inverted index generation

本部分将借助map和reduce function来生成反向索引。反向索引广泛应用于文档搜索。一般来讲,反向索引是底层数据到其原始位置的一个映射,比如文档搜索的反向索引是由关键字映射到包含这些关键字的文档。
修改main/ii.go的mapF和reduceF以生成反向索引。

思路:
mapF函数:使用strings.FieldsFunc获取文本的所有单词,然后为每个单词创建“单词:文件名”键值对,返回所有键值对。
reduceF函数:对同一个单词的所有文件名进行归纳,剔除重复的文件名,然后对文件名进行排序,按照“次数 文件名1,文件名2”的格式返回。

1
2
3
4
5
6
7
8
9
10
11
func mapF(document string, value string) (res []mapreduce.KeyValue) {
// TODO: you should complete this to do the inverted index challenge
kvs := make([]mapreduce.KeyValue,0)
values := strings.FieldsFunc(value,func(c rune)bool{
return !unicode.IsLetter(c)
})
for _,v:=range values{
kvs = append(kvs,mapreduce.KeyValue{Key:v,Value:document})
}
return kvs
}

1
2
3
4
5
6
7
8
9
10
11
12
13
func reduceF(key string, values []string) string {
// TODO: you should complete this to do the inverted index challenge
m := make(map[string]string,0)
for _,v:=range values{
m[v] = ""
}
var keys []string
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys)
return strconv.Itoa(len(keys)) + " " + strings.Join(keys,",")
}

测试:

1
2
3
$ cd "$GOPATH/src/main"
$ go run ii.go master sequential pg-*.txt
$ head -n5 mrtmp.iiseq


1
2
$ bash ./test-ii.sh
$ head -n5 mrtmp.iiseq

最终测试结果

测试:

1
2
$ cd "$GOPATH/src/main"
$ bash ./test-mr.sh

显示 Gitment 评论