综述
什么是MapReduce:MapReduce是一种编程模型,用于大规模数据集的并行运算。软件实现指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对;指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。
MapReduce的应用情况:master的schedule调度函数负责根据任务数分配任务给worker,并同步worker的工作。worker启动时指定Map函数和Reduce函数,接收master调遣时根据任务标志来决定执行Map函数还是Reduce函数。在不同的应用场景下,用户对Map函数和Reduce函数进行自定义,并在启动worker时进行注册即可。
例子1:单词统计程序中:Map任务将读取文件内容,调用Map函数将文件内容映射成一组新的键值对,然后将映射结果输出到对应的Reduce中间文件中。其中,Map(映射)函数将文件内容映射成一组新的键值对,键为单词,值为出现次数。
Reduce任务读取对应的Reduce中间文件中,保证所有相同“单词:次数”键值对共享相同的键组,调用Reduce(归约)函数统计该单词出现的总次数,最后将结果写入到中间文件。
例子2:反向索引生成程序中:Map任务将读取文件内容,调用Map函数将文件内容映射成一组新的键值对,然后将映射结果输出到对应的Reduce中间文件中。其中,Map(映射)函数将文件内容映射成一组新的键值对,键为单词,值为出现文件名。
Reduce任务读取对应的Reduce中间文件中,保证所有相同“单词:文件名”键值对共享相同的键组,调用Reduce(归约)函数统计该单词出现的次数和所有文件名,最后将结果写入到中间文件。
实验内容
准备
1、Go version 1.7
2、参考资料:
Pro Git book,git user’s manual,Git for Computer Scientists
3、项目地址:git://g.csail.mit.edu/6.824-golabs-2017
1 | $ git clone git://g.csail.mit.edu/6.824-golabs-2017 6.824 |
实验准备了顺序和分布式两种Map/Reduce模式。顺序模式启动第一个map任务,然后是第二个、第三个…所有map任务完成后启动reduce任务,该方法速度慢,但有利于调试。分布式模式在启动第一个map任务时就并发启动多个worker线程,然后reduce任务,该方法速度快,但不利于实现和调试。
Preamble: Getting familiar with the source
mapreduce提供了简单的Map/Reduce库。master.go的Distributed()函数和Sequential()函数分别提供了分布式模式和顺序模式用于启动job。job包括了以下内容:
1、提供许多输入文件、一个map函数、一个reduce函数、reduce任务的数目nReduce。
2、master将启动rpc服务(master_rpc.go
),等待worker的注册(Register() in master.go)。当map task和reduce task可分配时,master分配这些task到对应的worker(schedule() in schedule.go)。
3、master将每个输入文件当成一个map task,启动nMap个map task/worker,每个worker调用doMap()将输入文件的key/value pairs映射到nReduce个中间文件中,映射方法是对每一个pair的key进行哈希。map阶段将产生nMap x nReduce个中间文件,每个worker机器在本地存储nReduce个中间文件。在本实验中,文件存于本机,不同worker都是运行在本机上;但在实际中,每个worker应当被允许读取其他worker写入的文件,可以通过GFS在不同机器上运行worker,并使用分布式存储系统进行存储。
4、master启动reduce阶段(doReduce() in common_reduce.go
)。每个map task/worker有nReduce个中间文件,对于reduce task R,R将会收集每个map task/worker的第R个中间文件。reduse阶段将生成nReduce个文件。
5、master启动merge阶段(mr.merge() in master_splitmerge.go
),将nReduce个文件合并为一个文件。
6、master为每一个worker发送Shutdown RPC,然后关闭rpc服务。
Part I: Map/Reduce input and output
完成以下内容:
1、为每一个map task切分输出文件,实现doMap() in common_map.go
。
2、为每一个reduce task收集输入文件,实现doReduce() in common_reduce.go
。
思路:
1、doMap函数负责一个map task,这里对应读入一个文件内容。然后,调用map function将文件内容处理为KeyValue数组,遍历每一个KeyValue,对Key进行哈希,并输出到对应的reduce中间文件。doMap产生nReduce个中间文件。
2、doReduce函数负责处理所有map task产生的第r个reduce中间文件。读取每一个中间文件,将KeyValue映射到结果集中,相同key的value存于同一个数组中。接着,对key进行排序。最后,针对结果集的每一项key->values[],调用reduce function将values[]连接为字符串new value,并将key->new value写入到reduce task的目标文件中,该文件按key字段排序。
1 | func doMap( |
1 | func doReduce( |
测试:
1 | $ export "GOPATH=$PWD" |
输出调试结果:
1 | common.go加入debugEnabled = true |
运行结果
Part II: Single-worker word count
任务:
完成单词计数功能,为main/wc.go提供map function和reduce function。map function主要根据输入的文件内容对单词进行提取和计数,reduce function则是对相同key的value次数进行统计。
思路:
1、单词的提取使用strings.FieldsFunc函数,单词的定义是连续的letter字符串,提取如下:
1 | ss := strings.FieldsFunc(value, func(c rune) bool { |
2、注意,如果参数输入为go run wc.go master sequential pg-*.txt
,golang接收为os.Args[3:]
,可以自动识别匹配的文件个数。
1 | func mapF(filename string, contents string) []mapreduce.KeyValue { |
测试:
1 | $ cd "$GOPATH/src/main" |
或
1 | $ bash ./test-wc.sh |
测试结果
Part III: Distributing MapReduce tasks
mapreduce的一大卖点正是它可以将顺序模式代码并行化,而且开发者无需做任何改变。此部分内容将使用RPC来模拟分布式计算(没能够真正地在多台机器上机器上进行Map/Reduce部署)。
master设计思路:
1、暴露mapreduce.Distributed接口。根据参数构造master数据结构,包括rpc地址、done channel、互斥锁、条件变量、文件名数组、net.Listener、rpc shutdown channel等。
2、启动rpc服务。先创建rpc服务,然后注册master结构。接着,master使用net.Listen进行监听,并使用net.Listener.Accept()接收连接,最后使用Server.ServeConn(conn)处理连接。当worker调用master暴露的rpc接口时将产生连接请求。
1 | rpcs := rpc.NewServer() |
其中 Server.Register 用于注册RPC服务,默认的名字是对象的类型名字(这里是Echo)。如果需要指定特殊的名字,可以用 Server.RegisterName 进行注册。被注册对象的类型所有满足以下规则的方法会被导出到RPC服务接口:func (t *T) MethodName(argType T1, replyType *T2) error
。被注册对应至少要有一个方法满足这个特征,否则可能会注册失败。
master导出的服务接口包括func (mr *Master) Register(args *RegisterArgs, _ *struct{}) error
(用于worker注册)、func (mr *Master) Shutdown(_, _ *struct{}) error
(用于停止自身rpc服务)。
3、调用schedule启动map阶段。在此之前,使用string channel等待worker的注册(sync.Cond.Wait()),一旦注册了worker(sync.Cond.Broadcast()),将其rpc地址写入到string channel中。schedule函数参数包括该string channel,意味着只会使用已注册的worker。当然,如果在启动rpc服务之后、调用schedule之前已经有worker注册了,master将会依次将已注册的worker的rpc地址写入到string channel中,而schedule函数将从该channel中读取。
需要注意的是条件变量的用法,不同于pthread条件变量的初始化(条件变量和搭配使用的互斥锁),golang的初始化只使用了一个参数,sync.NewCond(master)用于初始化master的cond,参数是master结构体,master结构体需包含一个互斥锁和一个条件变量。
map阶段将根据输入文件数目确定map task数,然后分配给worker。每个worker负责一个或多个map task,将对应的文件内容哈希到nReduce个文件中。
4、调用schedule启动reduce阶段。该阶段继续使用map阶段的worker和新注册的worker。
reduce阶段将根据nReduce确定reduce task数,然后分配给worker。每个worker读取负责map的worker所保存的第r个文件,将其按照key有序规律输出到统计文件中。
5、关闭与workers的连接,关闭rpc服务。
6、启动merge阶段。按照key有序规律将nReduce个统计文件输出到最终文件中。
7、结束计算。
worker设计思路:
1、暴露mapreduce.RunWorker接口。根据参数构造worker数据结构,包括map function、reduce function、可被rpc调用的次数、rpc地址等。
2、worker向master注册。
3、启动rpc服务。先创建rpc服务,然后注册worker结构。接着,worker使用net.Listen进行监听,并使用net.Listener.Accept()接收连接,最后使用Server.ServeConn(conn)处理连接。当master调用worker暴露的rpc接口时将产生连接请求。RunWorker的最后一个参数表示该worker可被调用的rpc次数,一旦超过次数,worker将关闭监听。
worker暴露的rpc接口有func (wk *Worker) DoTask(arg *DoTaskArgs, _ *struct{}) error
(用于master分配工作)、func (wk *Worker) Shutdown(_ *struct{}, res *ShutdownReply) error
(用于工作完成时被master调用)。
4、worker结构体需包含一个互斥锁。
5、master可以分配一个或多个task给一个work,但必须等待上一个task完成才能分配下一个task。
schedule设计思路:
1、计算任务数。
2、对每一个任务创建一个goroutine。在每一个goroutine中,通过registerChan阻塞等待可用worker,master将会依次将已注册的worker的rpc地址写入到该channel中,完成任务之后对应的worker也会尝试写入到该channel中。channel的设计满足了“必须等待上一个task完成才能分配下一个task”的需求。
3、使用sync.WaitGroup进行任务同步。master必须等待所有worker完成map阶段之后才进行reduce阶段。
4、对registerChan的写入必须使用goroutine完成,否则map阶段当最后一个worker完成任务时,将会阻塞写入registerChan,而此时没有对registerChan进行读出,map阶段将无法正常完成。
1 | func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) { |
测试:
1 | $ cd "$GOPATH/src/mapreduce" |
测试结果
Part IV: Handling worker failures
本部分是需要处理worker出错的现象。MapReduce的worker没有持久状态,一旦worker出错,master发给该worker的rpc包将失败,因此,一旦发现这种现象,master需要将分配给该worker的任务重新分配给其他worker。
RPC失败不一定表示worker不在执行任务,有可能worker已经执行完任务了但是回复丢失了,或者worker正在执行任务但是master的RPC超时。因此,两个worker可能接收相同的task,并进行计算和生成输出。对一个给定的输入,对map和reduce function的两次调用将生成相同的输出,因此不会出现不一致性,后续程序不会读取到不同的输出。另外,MapReduce框架确保map和reduce function的输出是原子的:要么没有,要么是map和reduce function一次调用的完全输出(本实验没有实现这个功能,而是简单地停止worker,所以不会出现任务的并发执行问题)。
思路:
1、worker的fail实现是通过设置可被调用的rpc次数来实现的。
2、一旦worker关闭监听,master调用call将失败,尝试选取其他worker。因此,对于每一个任务的goroutine,借助for无限循环模拟worker失败并重新选取worker执行任务的操作,一旦调用成功则跳出循环。
1 | func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) { |
测试:
1 | $ cd "$GOPATH/src/mapreduce" |
测试结果
Part V: Inverted index generation
本部分将借助map和reduce function来生成反向索引。反向索引广泛应用于文档搜索。一般来讲,反向索引是底层数据到其原始位置的一个映射,比如文档搜索的反向索引是由关键字映射到包含这些关键字的文档。
修改main/ii.go的mapF和reduceF以生成反向索引。
思路:
mapF函数:使用strings.FieldsFunc获取文本的所有单词,然后为每个单词创建“单词:文件名”键值对,返回所有键值对。
reduceF函数:对同一个单词的所有文件名进行归纳,剔除重复的文件名,然后对文件名进行排序,按照“次数 文件名1,文件名2”的格式返回。
1 | func mapF(document string, value string) (res []mapreduce.KeyValue) { |
1 | func reduceF(key string, values []string) string { |
测试:
1 | $ cd "$GOPATH/src/main" |
或
1 | $ bash ./test-ii.sh |
测试结果
最终测试结果
测试:
1 | $ cd "$GOPATH/src/main" |
测试结果