
本文共 8158 字,大约阅读时间需要 27 分钟。
最近终于抽出时间开始学习MIT 6.824
,本文为我看MapReduce论文和做lab后的总结。
[]
lab要用到go语言,这也是我第一次接触。可以参考go语言圣经学习基本语法。
[]
MapReduce 简介
MapReduce描述了一种编程模型,由处理数据的map函数生成中间键值对(Key/Value),再由Reduce函数处理中间键值对生成输出文件。根据用户自定义的map和reduce函数不同,可以实现不同的功能。下面我简单总结了个人觉得比较关键的部分。
执行概述
在MapReduce中存在两种程序:master (也即lab中的coordinator) 和worker。master负责分配任务和接受反馈,更新任务列表。worker负责完成map和reduce任务。
在程序运行前,mapreduce库会将输入的数据切分成M个片段,即M个map任务,然后启动master和worker。master会对每个空闲的worker分配map或者reduce任务。
被分配了map任务的worker读取相应任务的数据,解析出键值对,生成中间键值对存入本地磁盘。这些键值对根据key的不同,被分区函数划分到R个区域内。worker将这些数据的位置传回master,master会将这些位置转发给执行reduce操作的worker。
执行reduce任务的worker在接受任务后,使用RPC的方式读取数据,并根据key进行排序,然后调用reduce函数生成R个输出文件。
容错
由于数百上千台机器同时运行,发生网络故障/设备中断是常有的事情,因此需要应对故障的方案。
worker故障
master会周期性的ping下每个worker,如果在一定时间内收不到来自某个worker的响应,master就会将该worker标记为failed,该worker正在执行的任务会被重置为【待执行】。master会将这些任务交给其他worker重新执行。
对于已经执行完的任务。如果是已完成的map任务,由于中间数据储存在发生故障的worker磁盘中,无法读取,因此需要重新执行该任务。如果是reduce任务则无须再执行,因为完成时输出文件已经储存在全局系统中。
master故障
一种解决方案是,将master上的数据周期性地写入磁盘,发生故障后从最新的checkpoint创建出一个新的备份,重启master进程。但往往需要人工干预。
Master的数据结构
在Master中包含了一些数据结构。它保存了每个Map任务和每个Reduce任务的状态(闲置,正在运行,以及完成),以及非空闲任务的worker机器的ID。
备用任务
在MapReduce计算中,一台机器花费了异常长的时间去完成最后几个Map或者Reduce任务会导致执行总时间延长很多。因此当一个MapReduce任务接近完成时,master可以调度一个备用(backup)任务来执行正在执行的任务。无论是主任务还是备用任务完成,都视为整个计算完成。可以显著减少大型计算花费的时间。
Lab1 总结
虽然看论文的时候感觉自己对MapReduce的执行过程了解的比较透彻,但是在实际实现全过程的时候才发现有很多地方没有注意到。果然是实践出真知。
在过程中遇到的一个比较大的坑是我对go的struct
不够了解。go中struct
用变量名的首字母大小写来区分public和private(可导出和不可导出),习惯了驼峰命名法的我并没有注意到。因此在测试的时候才需要全盘修改,花费了一些精力。
在lab中我们要实现的是一个在本地机器执行的mapreduce任务。和论文中介绍的不同,这个mapreduce没有实现对worker的周期检测,也不需要储存每个worker的状态,而是当worker在一段时间(lab中为10s)内没有完成任务时,直接将该worker视为故障,重新分配任务。并且任务时由worker主动申请再由master进行分配。这对于小任务是可行的,但对于无法预测时间的大型任务,应当按论文中进行实现。
在执行过程中,必须要先将map任务全部执行完,才能执行reduce任务。因为reduce任务要读取全部数据进行排序。当map任务已经分配完但没有全部完成时,部分没有任务可以执行的worker可能会空转。
for { switch reply.State{ case 0: //map任务 case 1: //reduce任务 case 2: continue //暂时没有任务,等待下一次申请 case 3: break //所有任务均已完成,worker停止工作 }
在任务分配上,我简单的采用了数组初始化所有任务,在分配任务时从数组中寻找【待执行】的任务(即state为0),更优化的方式可以考虑任务队列。将任务依次入队,对已经执行的任务出队,如果任务执行失败(超时),则重新入队。这样免去了遍历的过程。
因为在mrcoordinator.go
中我们可以看到,每隔1s中会执行一次c.Done()
,因此可以在c.Done()
中增加每次任务的运行时间。
m := mr.MakeCoordinator(os.Args[1:], 10) for m.Done() == false { time.Sleep(time.Second) }
论文中提到通过写入临时文件并重命名它的方式,可以避免在崩溃生成部分写入的文件,造成混乱。ioutil
库可以创建临时文件,并在写入结束后重命名为标准文件格式。
在实际上手时,可以先从worker开始,根据程序中给的example,分析执行过程,再在程序中添加对应的实现。RPC调用的函数必须要有返回值,否则运行时会报错找不到该函数。
实现代码
以下为实现代码,通过了全部测试。
//worker.gopackage mrimport "fmt"import "log"import "net/rpc"import "hash/fnv"import ( "time" "os" "sort" "io/ioutil" "strconv" "encoding/json")//// Map functions return a slice of KeyValue.//type KeyValue struct { Key string Value string}type ByKey []KeyValue// for sorting by key.func (a ByKey) Len() int { return len(a) }func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }//// use ihash(key) % NReduce to choose the reduce// task number for each KeyValue emitted by Map.//func ihash(key string) int { h := fnv.New32a() h.Write([]byte(key)) return int(h.Sum32() & 0x7fffffff)}//// main/mrworker.go calls this function.//func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { // Your worker implementation here. // uncomment to send the Example RPC to the coordinator. // CallExample() for { time.Sleep(time.Second) //睡眠一秒再接任务 args := ASKArgs{} reply := ASKReply{} callAskTask(&args, &reply) taskNumber := reply.TaskNumber switch reply.State{ case 0: file, err := os.Open(reply.FileName) if err != nil { log.Fatalf("cannot open mapTask file", reply.FileName) } content, err := ioutil.ReadAll(file) if err != nil { log.Fatalf("cannot read mapTask file", reply.FileName) } file.Close() kva := mapf(reply.FileName, string(content)) //写入mr-taskNumber-y文件中 WriteMiddleFile(kva, taskNumber, reply.NReduce) case 1: intermediate := []KeyValue{} nmap := reply.NMap for i:=0; i
//rpc.gopackage mr//// RPC definitions.//// remember to capitalize all names.//import "os"import "strconv"//// example to show how to declare the arguments// and reply for an RPC.//type ASKArgs struct { //申请时不需要任何信息}type ASKReply struct { State int //0-map 1-reduce 2-wait 3-shutdown FileName string //文件名 TaskNumber int //任务号 NReduce int //reduce任务中的分区数 NMap int //Map任务的总数}type FinishAgrs struct{ State int //同reply,用于更新Coordinator状态 TaskNumber int}type FinishReply struct{ State int //0-继续接受任务 1-任务全部完成,关闭worker}// Add your RPC definitions here.// Cook up a unique-ish UNIX-domain socket name// in /var/tmp, for the coordinator.// Can't use the current directory since// Athena AFS doesn't support UNIX-domain sockets.func coordinatorSock() string { s := "/var/tmp/824-mr-" s += strconv.Itoa(os.Getuid()) return s}
//coordinator.gopackage mrimport "log"import "net"import "os"import "net/rpc"import "net/http"import ( "sync")//缺少检测故障,不能主动分配任务//由设备主动申请任务,不需要轮训检查设备是否响应,因此不需要机器号type Coordinator struct { State int //0-map 1-reduce 2-finish NMap int //map任务总数 NReduce int //reduce分区数 MapTask map[int]*mapTask //map任务数组 ReduceTask map[int]*reduceTask //reduce任务数组 Mu sync.Mutex}type mapTask struct { FileName string State int //0-待做 1-进行中 2-已完成 RunTime int}type reduceTask struct { State int //0-待做 1-进行中 2-已完成 RunTime int}func (c *Coordinator) TickTick() { if c.State == 0 { for TaskNumber, task := range(c.MapTask){ if task.State == 1 { c.MapTask[TaskNumber].RunTime += 1 if c.MapTask[TaskNumber].RunTime>=10 { c.MapTask[TaskNumber].State = 0 } } } } else if c.State == 1 { for TaskNumber, task := range(c.ReduceTask){ if task.State == 1 { c.ReduceTask[TaskNumber].RunTime += 1 if c.ReduceTask[TaskNumber].RunTime>=10 { c.ReduceTask[TaskNumber].State = 0 } } } }}func (c *Coordinator) ASKTask(args *ASKArgs, reply *ASKReply) error{ c.Mu.Lock() defer c.Mu.Unlock() reply.State = 2 reply.NMap = c.NMap reply.NReduce = c.NReduce switch c.State { case 0: for TaskNumber, task := range(c.MapTask) { if task.State == 0 { reply.FileName = task.FileName reply.State = 0 reply.TaskNumber = TaskNumber c.MapTask[TaskNumber].State = 1 break } } case 1: for TaskNumber, task := range(c.ReduceTask) { if task.State == 0 { reply.State = 1 reply.TaskNumber = TaskNumber c.ReduceTask[TaskNumber].State = 1 break } } case 2: reply.State = 3 } return nil}func (c *Coordinator) FinishTask(args *FinishAgrs, reply *FinishReply) error{ c.Mu.Lock() defer c.Mu.Unlock() reply.State = 0 if args.State == 0 { c.MapTask[args.TaskNumber].State = 2 c.CheckState() } else { c.ReduceTask[args.TaskNumber].State = 2 c.CheckState() if c.State == 2 { reply.State = 1 } } return nil}func (c *Coordinator) CheckState() { for _, task := range(c.MapTask) { if task.State == 0 || task.State == 1 { c.State = 0 return } } for _, task := range(c.ReduceTask) { if task.State == 0 || task.State == 1 { c.State = 1 return } } c.State = 2}//// start a thread that listens for RPCs from worker.go//func (c *Coordinator) server() { rpc.Register(c) rpc.HandleHTTP() //l, e := net.Listen("tcp", ":1234") sockname := coordinatorSock() os.Remove(sockname) l, e := net.Listen("unix", sockname) if e != nil { log.Fatal("listen error:", e) } go http.Serve(l, nil)}//// main/mrcoordinator.go calls Done() periodically to find out// if the entire job has finished.//func (c *Coordinator) Done() bool { c.Mu.Lock() defer c.Mu.Unlock() ret := false c.TickTick() //在每次检查是否完成时,增加任务时间 if c.State == 2 { ret = true } else { ret = false } return ret}//// create a Coordinator.// main/mrcoordinator.go calls this function.// nReduce is the number of reduce tasks to use.//func MakeCoordinator(files []string, nReduce int) *Coordinator { maptask := make(map[int]*mapTask) reducetask := make(map[int]*reduceTask) for i, filename := range(files) { maptask[i] = &mapTask{FileName: filename, State: 0, RunTime: 0} } for j := 0; j < nReduce; j++ { reducetask[j] = &reduceTask{State: 0, RunTime: 0} } c := Coordinator{State: 0, NMap: len(files), NReduce: nReduce, MapTask: maptask, ReduceTask: reducetask, Mu: sync.Mutex{}} c.server() return &c}
发表评论
最新留言
关于作者
