mit6.5840 lab1:MapReduce

本文最后更新于:6 个月前

mit6.5840 lab1:MapReduce

写在前面

3月投了很多实习简历,经历了很多笔试面试,意识到自己的技术栈依然浅薄,所以开启我的第三个正式项目:基于go的分布式kv数据库。希望能掌握到go技术栈和分布式的知识。

正文

1 任务目标

构建一个MapReduce系统,实现worker进程(该进程调用应用程序 Map 和 Reduce 函数并处理文件的读取和写入)和coordinator进程(该进程将任务分发给工作线程并处理失败的worker线程)。实现mr文件夹下的三个文件即可。

2 任务思路

coordinator应该存在一个状态,用以告诉worker当前系统处于什么阶段,应该执行什么任务。

coordinator应该存放有任务队列,用以将任务发放。

coordinator应该告诉worker一些必要信息。

worker完成任务后应该通知coordinator,否则若超时将视为任务失败,将把超时任务分发给其他worker。

因为可能在同一时刻,有多个worker同时访问coordinator,所以coordinator在某些地方应该上锁。

因为在同一个文件系统,所以worker在执行任务时,应当先创建临时文件,再通过os.Rename()重命名,以免出现一些错误。

每个worker应该有一个ID,以便避免任务超时重发时出现的相关bug。

在本实验中,由于每个文件都不大,所以就不进行文件切分,而是把每个小文件当成一个map任务,然后产生nReduce数量的reduce任务和最终结果。

在Map阶段所有任务完成之后,再进行Reduce。

3 实现细节

3.1 rpc通信

在MapReduce中worker需要不断向coordinator请求任务,在完成任务时也需要向coordinator进行汇报。在实现上worker要和coordinator通信时只需要调用call方法就可以了,注意里面的参数有三个,分别是要调用的coordinator里的方法名、要传的参数以及接收数据用的数据结构,在该lab中传参为空即可。

插叙:剖析一下call方法,主要有两个步骤实现rpc通信。一是通过rpc.DialHTTP和本地的coordinator建立连接,二是调用rpc.DialHTTP的返回值的Call方法实现通信。

所以要解决的事情就在接收数据用的数据结构上面,把它在rpc文件里面定义出来就行了。(这里定义的时候可以根据需要随时加入内容,图方便就全放上来了。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Task worker向coordinator获取task的结构体
type Task struct {
TaskType TaskType // 任务类型判断到底是map还是reduce
TaskId int // 任务的id
ReducerNum int // 传入的reducer的数量,用于hash
FileSlice []string // 输入文件的切片,map一个文件对应一个文件,reduce是对应多个temp中间值文件
}

// TaskArgs rpc应该传入的参数,可实际上应该什么都不用传,因为只是worker获取一个任务
type TaskArgs struct{}

// TaskType 对于下方枚举任务的父类型
type TaskType int

// Phase 对于分配任务阶段的父类型
type Phase int

// State 任务的状态的父类型
type State int

// 枚举任务的类型
const (
MapTask TaskType = iota
ReduceTask
WaittingTask // Waittingen任务代表此时为任务都分发完了,但是任务还没完成,阶段未改变
ExitTask // exit
)

// 枚举阶段的类型
const (
MapPhase Phase = iota // 此阶段在分发MapTask
ReducePhase // 此阶段在分发ReduceTask
AllDone // 此阶段已完成
)

// 任务状态类型
const (
Working State = iota // 此阶段在工作
Waiting // 此阶段在等待执行
Done // 此阶段已经做完
)

3.2 worker主函数

主函数要做的就是不断循环,通过rpc向coordinator获取任务,再根据任务的类型做相应的处理(调不同的函数就行了)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
//CallExample()
// 唤醒此worker
keepFlag := true
for keepFlag {
task := GetTask()
switch task.TaskType {
case MapTask:
{
DoMapTask(mapf, &task)
callDone(&task)
}
case WaittingTask:
{
time.Sleep(time.Second * 5)
}
case ReduceTask:
{
DoReduceTask(reducef, &task)
callDone(&task)
}
case ExitTask:
{
time.Sleep(time.Second)
fmt.Println("All tasks are Done ,will be exiting...")
keepFlag = false
}
}
}
time.Sleep(time.Second)
}

3.3 worker向coordinator获取任务

上面介绍rpc时候说了,调call方法就行,放置一个在rpc里定义的结构体去接收coordinator传回来的任务。这里调的Coordinator.PollTask就是在coordinator里面用来发任务的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
// GetTask 获取任务(需要知道是Map任务,还是Reduce)
func GetTask() Task {
args := TaskArgs{}
reply := Task{}
ok := call("Coordinator.PollTask", &args, &reply)
//wMu.Unlock()
if ok {
//fmt.Println("worker get ", reply.TaskType, "task :Id[", reply.TaskId, "]")
} else {
fmt.Printf("call failed!\n")
}
return reply
}

3.4 worker处理map任务

核心其实是mapf函数,但原代码已经提供给我们了,所以只需要把mapf的参数弄好就可以了。简简单单读取个文件,作为参数调用mapf,然后按照MapReduce的逻辑,将mapf返回来的kv数组划分成长度为nReduce(系统提供)的二维切片保存到本地作为临时文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func DoMapTask(mapf func(string, string) []KeyValue, response *Task) {
var intermediate []KeyValue
filename := response.FileSlice[0]

file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
// 通过io工具包获取content作为mapf的参数
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
// mapf返回一组KV结构体数组
intermediate = mapf(filename, string(content))

//initialize and loop over []KeyValue
rn := response.ReducerNum
// 创建一个长度为nReduce的二维切片
HashedKV := make([][]KeyValue, rn)

for _, kv := range intermediate {
HashedKV[ihash(kv.Key)%rn] = append(HashedKV[ihash(kv.Key)%rn], kv)
}
for i := 0; i < rn; i++ {
oname := "mr-tmp-" + strconv.Itoa(response.TaskId) + "-" + strconv.Itoa(i)
ofile, _ := os.Create(oname)
enc := json.NewEncoder(ofile)
for _, kv := range HashedKV[i] {
err := enc.Encode(kv)
if err != nil {
return
}
}
ofile.Close()
}
}

3.5 worker处理reduce任务

大体思路和map差不多,不同点在于在调用reducef前要先取出来该任务对应的这些中间文件,对它们做一次洗牌(按键值从小到大排序,下面再说)。在调用reducef后将返回值写入到一个文件就行了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func DoReduceTask(reducef func(string, []string) string, response *Task) {
reduceFileNum := response.TaskId
intermediate := shuffle(response.FileSlice)
dir, _ := os.Getwd()
tempFile, err := ioutil.TempFile(dir, "mr-tmp-*")
if err != nil {
log.Fatal("Failed to create temp file", err)
}
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
var values []string
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
fmt.Fprintf(tempFile, "%v %v\n", intermediate[i].Key, output)
i = j
}
tempFile.Close()

// 在完全写入后进行重命名
fn := fmt.Sprintf("mr-out-%d", reduceFileNum)
os.Rename(tempFile.Name(), fn)
}

3.6 洗牌(排序)

为了模拟真正分布式运行的操作方式,在这里用了一个json编码解码的冗余操作。核心在于让键值对结构体按键做排序,所以重写了sort接口,在实例程序中有所体现,复用即可(正好前几天美团二面时也问到了这个问题)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 洗牌方法,得到一组排序好的kv数组
func shuffle(files []string) []KeyValue {
var kva []KeyValue
for _, filepath := range files {
file, _ := os.Open(filepath)
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
file.Close()
}
// 重写sort接口
sort.Sort(SortedKey(kva))
return kva
}
1
2
3
4
5
6
type SortedKey []KeyValue

// Len 重写len,swap,less才能排序
func (k SortedKey) Len() int { return len(k) }
func (k SortedKey) Swap(i, j int) { k[i], k[j] = k[j], k[i] }
func (k SortedKey) Less(i, j int) bool { return k[i].Key < k[j].Key }

3.7 coordinator定义

注释详细。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 定义为全局,worker之间访问coordinator时加锁
var (
mu sync.Mutex
)

type Coordinator struct {
// Your definitions here.
ReducerNum int // 传入的参数决定需要多少个reducer
TaskId int // 用于生成task的特殊id
DistPhase Phase // 目前整个框架应该处于什么任务阶段
ReduceTaskChannel chan *Task // 使用chan保证并发安全
MapTaskChannel chan *Task // 使用chan保证并发安全
taskMetaHolder TaskMetaHolder // 存着task
files []string // 传入的文件数组
}

// TaskMetaInfo 保存任务的元数据
type TaskMetaInfo struct {
state State // 任务的状态
StartTime time.Time // 任务的开始时间,为crash做准备
TaskAdr *Task // 传入任务的指针,为的是这个任务从通道中取出来后,还能通过地址标记这个任务已经完成
}

// TaskMetaHolder 保存全部任务的元数据
type TaskMetaHolder struct {
MetaMap map[int]*TaskMetaInfo // 通过下标hash快速定位
}

3.8 coordinator主函数

初始化coodinator,然后调函数把文件全部放入map队列里面,再开启server监听worker的请求就行了。最后又起了一个线程去做crash检测,目的是通过最后一项crash test,稍后再说。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{
files: files,
ReducerNum: nReduce,
DistPhase: MapPhase,
MapTaskChannel: make(chan *Task, len(files)),
ReduceTaskChannel: make(chan *Task, nReduce),
taskMetaHolder: TaskMetaHolder{
MetaMap: make(map[int]*TaskMetaInfo, len(files)+nReduce), // 任务的总数应该是files + Reducer的数量
},
}
c.makeMapTasks(files)
c.server()
go c.CrashDetector()
return &c
}

3.9 coordinator初始化map队列

把文件名字符串数组挨个转化成task形式,加入到map队列里就行了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// makeMapTasks 对map任务进行处理,初始化map任务
func (c *Coordinator) makeMapTasks(files []string) {
// 文件->任务->map队列
for _, v := range files {
id := c.generateTaskId()
task := Task{
TaskType: MapTask,
TaskId: id,
ReducerNum: c.ReducerNum,
FileSlice: []string{v},
}

// 保存任务的初始状态
taskMetaInfo := TaskMetaInfo{
state: Waiting, // 任务等待被执行
TaskAdr: &task, // 保存任务的地址
}
c.taskMetaHolder.acceptMeta(&taskMetaInfo)

// 新生成的任务加入到map队列
c.MapTaskChannel <- &task
}
}

3.10 coordinator初始化reduce队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// makeReduceTasks 初始化reduce队列
func (c *Coordinator) makeReduceTasks() {
// map任务->reduce队列
for i := 0; i < c.ReducerNum; i++ {
id := c.generateTaskId()
task := Task{
TaskId: id,
TaskType: ReduceTask,
FileSlice: selectReduceName(i),
}

// 保存任务的初始状态
taskMetaInfo := TaskMetaInfo{
state: Waiting, // 任务等待被执行
TaskAdr: &task, // 保存任务的地址
}
c.taskMetaHolder.acceptMeta(&taskMetaInfo)

// 新生成的任务加入到map队列
c.ReduceTaskChannel <- &task
}
}

3.11 补充:crash test?

这一部分是应对当worker崩溃时候的措施。循环检查当前任务状态,如果超时就将任务收回,重新加入到队列中去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// CrashDetector crash检测
func (c *Coordinator) CrashDetector() {
for {
time.Sleep(time.Second * 2)
mu.Lock()
if c.DistPhase == AllDone {
mu.Unlock()
break
}

for _, v := range c.taskMetaHolder.MetaMap {
if v.state == Working {
//fmt.Println("task[", v.TaskAdr.TaskId, "] is working: ", time.Since(v.StartTime), "s")
}

if v.state == Working && time.Since(v.StartTime) > 9*time.Second {
fmt.Printf("the task[ %d ] is crash,take [%d] s\n", v.TaskAdr.TaskId, time.Since(v.StartTime))

switch v.TaskAdr.TaskType {
case MapTask:
c.MapTaskChannel <- v.TaskAdr
v.state = Waiting
case ReduceTask:
c.ReduceTaskChannel <- v.TaskAdr
v.state = Waiting

}
}
}
mu.Unlock()
}
}

写在后面

大概主要的函数都写在上面了,有一些简单的函数没体现在上面,不过大致思路弄明白了相信就能轻松实现了。这一个lab大概明白了分布式的一个基本思路,不过还没接触到真正分布式的东西,接着往后学吧。