本文最后更新于:6 个月前
mit6.5840 lab1:MapReduce
1 任务目标
构建一个MapReduce系统,实现worker进程(该进程调用应用程序 Map 和 Reduce 函数并处理文件的读取和写入)和coordinator进程(该进程将任务分发给工作线程并处理失败的worker线程)。实现mr文件夹下的三个文件即可。
2 任务思路
3 实现细节
3.1 rpc通信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 type Task struct { TaskType TaskType TaskId int ReducerNum int FileSlice []string }type TaskArgs struct {}type TaskType int type Phase int type State int const ( MapTask TaskType = iota ReduceTask WaittingTask ExitTask )const ( MapPhase Phase = iota ReducePhase AllDone )const ( Working State = iota Waiting Done )
3.2 worker主函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func Worker (mapf func (string , string ) []KeyValue, reducef func (string , []string ) string ) { keepFlag := true for keepFlag { task := GetTask() switch task.TaskType { case MapTask: { DoMapTask(mapf, &task) callDone(&task) } case WaittingTask: { time.Sleep(time.Second * 5 ) } case ReduceTask: { DoReduceTask(reducef, &task) callDone(&task) } case ExitTask: { time.Sleep(time.Second) fmt.Println("All tasks are Done ,will be exiting..." ) keepFlag = false } } } time.Sleep(time.Second) }
3.3 worker向coordinator获取任务
1 2 3 4 5 6 7 8 9 10 11 12 13 func GetTask () Task { args := TaskArgs{} reply := Task{} ok := call("Coordinator.PollTask" , &args, &reply) if ok { } else { fmt.Printf("call failed!\n" ) } return reply }
3.4 worker处理map任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 func DoMapTask (mapf func (string , string ) []KeyValue, response *Task) { var intermediate []KeyValue filename := response.FileSlice[0 ] file, err := os.Open(filename) if err != nil { log.Fatalf("cannot open %v" , filename) } content, err := ioutil.ReadAll(file) if err != nil { log.Fatalf("cannot read %v" , filename) } file.Close() intermediate = mapf(filename, string (content)) rn := response.ReducerNum HashedKV := make ([][]KeyValue, rn) for _, kv := range intermediate { HashedKV[ihash(kv.Key)%rn] = append (HashedKV[ihash(kv.Key)%rn], kv) } for i := 0 ; i < rn; i++ { oname := "mr-tmp-" + strconv.Itoa(response.TaskId) + "-" + strconv.Itoa(i) ofile, _ := os.Create(oname) enc := json.NewEncoder(ofile) for _, kv := range HashedKV[i] { err := enc.Encode(kv) if err != nil { return } } ofile.Close() } }
3.5 worker处理reduce任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func DoReduceTask (reducef func (string , []string ) string , response *Task) { reduceFileNum := response.TaskId intermediate := shuffle(response.FileSlice) dir, _ := os.Getwd() tempFile, err := ioutil.TempFile(dir, "mr-tmp-*" ) if err != nil { log.Fatal("Failed to create temp file" , err) } i := 0 for i < len (intermediate) { j := i + 1 for j < len (intermediate) && intermediate[j].Key == intermediate[i].Key { j++ } var values []string for k := i; k < j; k++ { values = append (values, intermediate[k].Value) } output := reducef(intermediate[i].Key, values) fmt.Fprintf(tempFile, "%v %v\n" , intermediate[i].Key, output) i = j } tempFile.Close() fn := fmt.Sprintf("mr-out-%d" , reduceFileNum) os.Rename(tempFile.Name(), fn) }
3.6 洗牌(排序)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func shuffle (files []string ) []KeyValue { var kva []KeyValue for _, filepath := range files { file, _ := os.Open(filepath) dec := json.NewDecoder(file) for { var kv KeyValue if err := dec.Decode(&kv); err != nil { break } kva = append (kva, kv) } file.Close() } sort.Sort(SortedKey(kva)) return kva }
1 2 3 4 5 6 type SortedKey []KeyValuefunc (k SortedKey) Len() int { return len (k) }func (k SortedKey) Swap(i, j int ) { k[i], k[j] = k[j], k[i] }func (k SortedKey) Less(i, j int ) bool { return k[i].Key < k[j].Key }
3.7 coordinator定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 var ( mu sync.Mutex )type Coordinator struct { ReducerNum int TaskId int DistPhase Phase ReduceTaskChannel chan *Task MapTaskChannel chan *Task taskMetaHolder TaskMetaHolder files []string }type TaskMetaInfo struct { state State StartTime time.Time TaskAdr *Task }type TaskMetaHolder struct { MetaMap map [int ]*TaskMetaInfo }
3.8 coordinator主函数
初始化coodinator,然后调函数把文件全部放入map队列里面,再开启server监听worker的请求就行了。最后又起了一个线程去做crash检测,目的是通过最后一项crash test,稍后再说。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func MakeCoordinator (files []string , nReduce int ) *Coordinator { c := Coordinator{ files: files, ReducerNum: nReduce, DistPhase: MapPhase, MapTaskChannel: make (chan *Task, len (files)), ReduceTaskChannel: make (chan *Task, nReduce), taskMetaHolder: TaskMetaHolder{ MetaMap: make (map [int ]*TaskMetaInfo, len (files)+nReduce), }, } c.makeMapTasks(files) c.server() go c.CrashDetector() return &c }
3.9 coordinator初始化map队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (c *Coordinator) makeMapTasks(files []string ) { for _, v := range files { id := c.generateTaskId() task := Task{ TaskType: MapTask, TaskId: id, ReducerNum: c.ReducerNum, FileSlice: []string {v}, } taskMetaInfo := TaskMetaInfo{ state: Waiting, TaskAdr: &task, } c.taskMetaHolder.acceptMeta(&taskMetaInfo) c.MapTaskChannel <- &task } }
3.10 coordinator初始化reduce队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func (c *Coordinator) makeReduceTasks() { for i := 0 ; i < c.ReducerNum; i++ { id := c.generateTaskId() task := Task{ TaskId: id, TaskType: ReduceTask, FileSlice: selectReduceName(i), } taskMetaInfo := TaskMetaInfo{ state: Waiting, TaskAdr: &task, } c.taskMetaHolder.acceptMeta(&taskMetaInfo) c.ReduceTaskChannel <- &task } }
3.11 补充:crash test?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func (c *Coordinator) CrashDetector() { for { time.Sleep(time.Second * 2 ) mu.Lock() if c.DistPhase == AllDone { mu.Unlock() break } for _, v := range c.taskMetaHolder.MetaMap { if v.state == Working { } if v.state == Working && time.Since(v.StartTime) > 9 *time.Second { fmt.Printf("the task[ %d ] is crash,take [%d] s\n" , v.TaskAdr.TaskId, time.Since(v.StartTime)) switch v.TaskAdr.TaskType { case MapTask: c.MapTaskChannel <- v.TaskAdr v.state = Waiting case ReduceTask: c.ReduceTaskChannel <- v.TaskAdr v.state = Waiting } } } mu.Unlock() } }