mit6.5840 lab2C:Raft持久化

本文最后更新于:6 个月前

mit6.5840 lab2C:Raft持久化

写在前面

本节实现raft状态的persist(持久化),为了能在机器crash的时候能够restore原来的状态。持久化需要保存的内容是currentTerm、voteFor、log[]三个变量。因此只要当raft节点的这三个状态发生改变时,就直接persist就好了。

当然在课程的lab2C介绍中还要求做到paper中p7末到p7顶部那一段的细节实现。实现的细节就是你可以在RPC的时候,如果有冲突,直接返回冲突的下标,或者term,然后返回冲突的下标,进行修改nextIndex[i]亦或者是term,达到减少RPC的次数。而2C的实验中的Figure 8 (unreliable)也是基于此方法,测试你的RPC次数会不会过多。

正文

1 persist.go:实现持久化操作

原代码里面已经有了非常详细的example,比较简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package raft

//
// support for Raft and kvraft to save persistent
// Raft state (log &c) and k/v server snapshots.
//
// we will use the original persister.go to test your code for grading.
// so, while you can modify this code to help you debug, please
// test with the original before submitting.
//

import "sync"

type Persister struct {
mu sync.Mutex
raftstate []byte
snapshot []byte
}

func MakePersister() *Persister {
return &Persister{}
}

func clone(orig []byte) []byte {
x := make([]byte, len(orig))
copy(x, orig)
return x
}

func (ps *Persister) Copy() *Persister {
ps.mu.Lock()
defer ps.mu.Unlock()
np := MakePersister()
np.raftstate = ps.raftstate
np.snapshot = ps.snapshot
return np
}

func (ps *Persister) ReadRaftState() []byte {
ps.mu.Lock()
defer ps.mu.Unlock()
return clone(ps.raftstate)
}

func (ps *Persister) RaftStateSize() int {
ps.mu.Lock()
defer ps.mu.Unlock()
return len(ps.raftstate)
}

func (ps *Persister) SaveRaftState(state []byte) {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.raftstate = clone(state)
}

// Save both Raft state and K/V snapshot as a single atomic action,
// to help avoid them getting out of sync.
func (ps *Persister) Save(raftstate []byte, snapshot []byte) {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.raftstate = clone(raftstate)
ps.snapshot = clone(snapshot)
}

func (ps *Persister) ReadSnapshot() []byte {
ps.mu.Lock()
defer ps.mu.Unlock()
return clone(ps.snapshot)
}

func (ps *Persister) SnapshotSize() int {
ps.mu.Lock()
defer ps.mu.Unlock()
return len(ps.snapshot)
}

2 persist()和readpersist():编码解码

值得一提的是要注意自己编码与解码时变量是否一致,否则在解码(decode)的时候会取不到值,而爆出decode error,这个坑也花了笔者的两三个小时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
func (rf *Raft) persist() {
// Your code here (2C).
// Example:
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.logs)
data := w.Bytes()
rf.persister.SaveRaftState(data)
}

// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
// Your code here (2C).
// Example:
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var currentTerm int
var votedFor int
var logs []LogEntry
if d.Decode(&currentTerm) != nil ||
d.Decode(&votedFor) != nil ||
d.Decode(&logs) != nil {
fmt.Println("decode error")
} else {
rf.currentTerm = currentTerm
rf.votedFor = votedFor
rf.logs = logs
}
}

3 调整Mismatch和AppCommitted

需要在追加不匹配和日志已提交这两种状态下将节点置为跟随者状态,并且重置raft状态。最后与过时不同的是,我们需要将下一个执行的日志索引置为已知最新,即reply。

1
2
3
4
5
6
7
8
9
case Mismatch, AppCommitted:
if reply.Term > rf.currentTerm {
rf.status = Follower
rf.votedFor = -1
rf.timer.Reset(rf.overtime)
rf.currentTerm = reply.Term
rf.persist()
}
rf.nextIndex[server] = reply.UpNextIndex

4 在适当位置调用persist方法

按论文中阐述的,每当currentTerm、voteFor、log[]三个state发生改变就调用persist就好了。