本文最后更新于:6 个月前
mit6.5840 lab1:MapReduce
写在前面
3月投了很多实习简历,经历了很多笔试面试,意识到自己的技术栈依然浅薄,所以开启我的第三个正式项目:基于go的分布式kv数据库。希望能掌握到go技术栈和分布式的知识。
正文
1 任务目标
构建一个MapReduce系统,实现worker进程(该进程调用应用程序 Map 和 Reduce 函数并处理文件的读取和写入)和coordinator进程(该进程将任务分发给工作线程并处理失败的worker线程)。实现mr文件夹下的三个文件即可。
2 任务思路
coordinator应该存在一个状态,用以告诉worker当前系统处于什么阶段,应该执行什么任务。
coordinator应该存放有任务队列,用以将任务发放。
coordinator应该告诉worker一些必要信息。
worker完成任务后应该通知coordinator,否则若超时将视为任务失败,将把超时任务分发给其他worker。
因为可能在同一时刻,有多个worker同时访问coordinator,所以coordinator在某些地方应该上锁。
因为在同一个文件系统,所以worker在执行任务时,应当先创建临时文件,再通过os.Rename()重命名,以免出现一些错误。
每个worker应该有一个ID,以便避免任务超时重发时出现的相关bug。
在本实验中,由于每个文件都不大,所以就不进行文件切分,而是把每个小文件当成一个map任务,然后产生nReduce数量的reduce任务和最终结果。
在Map阶段所有任务完成之后,再进行Reduce。
3 实现细节
3.1 rpc通信
在MapReduce中worker需要不断向coordinator请求任务,在完成任务时也需要向coordinator进行汇报。在实现上worker要和coordinator通信时只需要调用call方法就可以了,注意里面的参数有三个,分别是要调用的coordinator里的方法名、要传的参数以及接收数据用的数据结构,在该lab中传参为空即可。
插叙:剖析一下call方法,主要有两个步骤实现rpc通信。一是通过rpc.DialHTTP和本地的coordinator建立连接,二是调用rpc.DialHTTP的返回值的Call方法实现通信。
所以要解决的事情就在接收数据用的数据结构上面,把它在rpc文件里面定义出来就行了。(这里定义的时候可以根据需要随时加入内容,图方便就全放上来了。)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 type Task struct { TaskType TaskType TaskId int ReducerNum int FileSlice []string }type TaskArgs struct {}type TaskType int type Phase int type State int const ( MapTask TaskType = iota ReduceTask WaittingTask ExitTask )const ( MapPhase Phase = iota ReducePhase AllDone )const ( Working State = iota Waiting Done )
3.2 worker主函数
主函数要做的就是不断循环,通过rpc向coordinator获取任务,再根据任务的类型做相应的处理(调不同的函数就行了)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func Worker (mapf func (string , string ) []KeyValue, reducef func (string , []string ) string ) { keepFlag := true for keepFlag { task := GetTask() switch task.TaskType { case MapTask: { DoMapTask(mapf, &task) callDone(&task) } case WaittingTask: { time.Sleep(time.Second * 5 ) } case ReduceTask: { DoReduceTask(reducef, &task) callDone(&task) } case ExitTask: { time.Sleep(time.Second) fmt.Println("All tasks are Done ,will be exiting..." ) keepFlag = false } } } time.Sleep(time.Second) }
3.3 worker向coordinator获取任务
上面介绍rpc时候说了,调call方法就行,放置一个在rpc里定义的结构体去接收coordinator传回来的任务。这里调的Coordinator.PollTask就是在coordinator里面用来发任务的函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 func GetTask () Task { args := TaskArgs{} reply := Task{} ok := call("Coordinator.PollTask" , &args, &reply) if ok { } else { fmt.Printf("call failed!\n" ) } return reply }
3.4 worker处理map任务
核心其实是mapf函数,但原代码已经提供给我们了,所以只需要把mapf的参数弄好就可以了。简简单单读取个文件,作为参数调用mapf,然后按照MapReduce的逻辑,将mapf返回来的kv数组划分成长度为nReduce(系统提供)的二维切片保存到本地作为临时文件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 func DoMapTask (mapf func (string , string ) []KeyValue, response *Task) { var intermediate []KeyValue filename := response.FileSlice[0 ] file, err := os.Open(filename) if err != nil { log.Fatalf("cannot open %v" , filename) } content, err := ioutil.ReadAll(file) if err != nil { log.Fatalf("cannot read %v" , filename) } file.Close() intermediate = mapf(filename, string (content)) rn := response.ReducerNum HashedKV := make ([][]KeyValue, rn) for _, kv := range intermediate { HashedKV[ihash(kv.Key)%rn] = append (HashedKV[ihash(kv.Key)%rn], kv) } for i := 0 ; i < rn; i++ { oname := "mr-tmp-" + strconv.Itoa(response.TaskId) + "-" + strconv.Itoa(i) ofile, _ := os.Create(oname) enc := json.NewEncoder(ofile) for _, kv := range HashedKV[i] { err := enc.Encode(kv) if err != nil { return } } ofile.Close() } }
3.5 worker处理reduce任务
大体思路和map差不多,不同点在于在调用reducef前要先取出来该任务对应的这些中间文件,对它们做一次洗牌(按键值从小到大排序,下面再说)。在调用reducef后将返回值写入到一个文件就行了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func DoReduceTask (reducef func (string , []string ) string , response *Task) { reduceFileNum := response.TaskId intermediate := shuffle(response.FileSlice) dir, _ := os.Getwd() tempFile, err := ioutil.TempFile(dir, "mr-tmp-*" ) if err != nil { log.Fatal("Failed to create temp file" , err) } i := 0 for i < len (intermediate) { j := i + 1 for j < len (intermediate) && intermediate[j].Key == intermediate[i].Key { j++ } var values []string for k := i; k < j; k++ { values = append (values, intermediate[k].Value) } output := reducef(intermediate[i].Key, values) fmt.Fprintf(tempFile, "%v %v\n" , intermediate[i].Key, output) i = j } tempFile.Close() fn := fmt.Sprintf("mr-out-%d" , reduceFileNum) os.Rename(tempFile.Name(), fn) }
3.6 洗牌(排序)
为了模拟真正分布式运行的操作方式,在这里用了一个json编码解码的冗余操作。核心在于让键值对结构体按键做排序,所以重写了sort接口,在实例程序中有所体现,复用即可(正好前几天美团二面时也问到了这个问题)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func shuffle (files []string ) []KeyValue { var kva []KeyValue for _, filepath := range files { file, _ := os.Open(filepath) dec := json.NewDecoder(file) for { var kv KeyValue if err := dec.Decode(&kv); err != nil { break } kva = append (kva, kv) } file.Close() } sort.Sort(SortedKey(kva)) return kva }
1 2 3 4 5 6 type SortedKey []KeyValuefunc (k SortedKey) Len() int { return len (k) }func (k SortedKey) Swap(i, j int ) { k[i], k[j] = k[j], k[i] }func (k SortedKey) Less(i, j int ) bool { return k[i].Key < k[j].Key }
3.7 coordinator定义
注释详细。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 var ( mu sync.Mutex )type Coordinator struct { ReducerNum int TaskId int DistPhase Phase ReduceTaskChannel chan *Task MapTaskChannel chan *Task taskMetaHolder TaskMetaHolder files []string }type TaskMetaInfo struct { state State StartTime time.Time TaskAdr *Task }type TaskMetaHolder struct { MetaMap map [int ]*TaskMetaInfo }
3.8 coordinator主函数
初始化coodinator,然后调函数把文件全部放入map队列里面,再开启server监听worker的请求就行了。最后又起了一个线程去做crash检测,目的是通过最后一项crash test,稍后再说。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func MakeCoordinator (files []string , nReduce int ) *Coordinator { c := Coordinator{ files: files, ReducerNum: nReduce, DistPhase: MapPhase, MapTaskChannel: make (chan *Task, len (files)), ReduceTaskChannel: make (chan *Task, nReduce), taskMetaHolder: TaskMetaHolder{ MetaMap: make (map [int ]*TaskMetaInfo, len (files)+nReduce), }, } c.makeMapTasks(files) c.server() go c.CrashDetector() return &c }
3.9 coordinator初始化map队列
把文件名字符串数组挨个转化成task形式,加入到map队列里就行了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (c *Coordinator) makeMapTasks(files []string ) { for _, v := range files { id := c.generateTaskId() task := Task{ TaskType: MapTask, TaskId: id, ReducerNum: c.ReducerNum, FileSlice: []string {v}, } taskMetaInfo := TaskMetaInfo{ state: Waiting, TaskAdr: &task, } c.taskMetaHolder.acceptMeta(&taskMetaInfo) c.MapTaskChannel <- &task } }
3.10 coordinator初始化reduce队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func (c *Coordinator) makeReduceTasks() { for i := 0 ; i < c.ReducerNum; i++ { id := c.generateTaskId() task := Task{ TaskId: id, TaskType: ReduceTask, FileSlice: selectReduceName(i), } taskMetaInfo := TaskMetaInfo{ state: Waiting, TaskAdr: &task, } c.taskMetaHolder.acceptMeta(&taskMetaInfo) c.ReduceTaskChannel <- &task } }
3.11 补充:crash test?
这一部分是应对当worker崩溃时候的措施。循环检查当前任务状态,如果超时就将任务收回,重新加入到队列中去。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func (c *Coordinator) CrashDetector() { for { time.Sleep(time.Second * 2 ) mu.Lock() if c.DistPhase == AllDone { mu.Unlock() break } for _, v := range c.taskMetaHolder.MetaMap { if v.state == Working { } if v.state == Working && time.Since(v.StartTime) > 9 *time.Second { fmt.Printf("the task[ %d ] is crash,take [%d] s\n" , v.TaskAdr.TaskId, time.Since(v.StartTime)) switch v.TaskAdr.TaskType { case MapTask: c.MapTaskChannel <- v.TaskAdr v.state = Waiting case ReduceTask: c.ReduceTaskChannel <- v.TaskAdr v.state = Waiting } } } mu.Unlock() } }
写在后面
大概主要的函数都写在上面了,有一些简单的函数没体现在上面,不过大致思路弄明白了相信就能轻松实现了。这一个lab大概明白了分布式的一个基本思路,不过还没接触到真正分布式的东西,接着往后学吧。