mit6.5840 lab2B:Raft日志同步

本文最后更新于:6 个月前

mit6.5840 lab2B:Raft日志同步

写在前面

这一节实现上一节中未实现的日志增量内容,去看2b的test可以发现其实是调用raft中的start函数,对leader节点写入log,然后检测log是否成功其实就是通过applyChan协程一直检测,可以自己多去看看test的源码。然后具体的代码编写、字段其实paper中也提到了,包括一些实现的细节也在figure中有提到。

为减少代码冗余,本文只描述新增加或修改的代码。

正文

1 Raft结构体新增applyChan

1
applyChan chan ApplyMsg // 日志存通道里(2B)

2 ticker:初始日志同步

对比2A增加了实现发送初始日志包。nextIndex代表的其实是代表下一次发送的日志的index代表哪里,paper中的表格也有体现。因此剪切发送的log也是根据这个来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 领导者发送日志同步
case Leader:
appendNums := 1 // 对于正确返回的节点数量
rf.timer.Reset(HeartBeatTimeout)

// 对自身以外的节点发送心跳同步
for i := 0; i < len(rf.peers); i++ {
// 遍历到自己本身 跳过
if i == rf.me {
continue
}

// 初始化appendEntries消息结构体
appendEntriesArgs := AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: 0,
PrevLogTerm: 0,
Entries: nil,
LeaderCommit: rf.commitIndex,
}

// 定义reply消息结构体
appendEntriesReply := AppendEntriesReply{}

// 如果nextIndex[i]长度不等于rf.logs,代表与leader的log entries不一致,需要附带过去
appendEntriesArgs.Entries = rf.logs[rf.nextIndex[i]-1:]

// 代表已经不是初始值0
if rf.nextIndex[i] > 0 {
appendEntriesArgs.PrevLogIndex = rf.nextIndex[i] - 1
}

if appendEntriesArgs.PrevLogIndex > 0 {
appendEntriesArgs.PrevLogTerm = rf.logs[appendEntriesArgs.PrevLogIndex-1].Term
}

// 起线程发送request
go rf.sendAppendEntries(i, &appendEntriesArgs, &appendEntriesReply, &appendNums)
}
}

3 日志增量 rpc构造

3.1 增加两种追加日志状态

1
2
3
4
5
6
7
const (
AppNormal AppendEntriesState = iota // 追加正常
AppOutOfDate // 追加过时
AppKilled // Raft程序终止
AppCommitted // 追加的日志已经提交
Mismatch // 追加不匹配
)

3.2 AppendEntriesReply新增UpNextIndex

1
2
3
4
5
6
type AppendEntriesReply struct {
Term int // leader的term可能是过时的,此时收到的Term用于更新他自己
Success bool // 如果follower与Args中的PreLogIndex/PreLogTerm都匹配才会接过去新的日志(追加),不匹配直接返回false
AppState AppendEntriesState // 追加状态
UpNextIndex int // 用于更新请求节点的nextIndex[i]
}

3.3 修改AppendEntries

有两个情况会导致conflict:

  • 如果preLogIndex大于当前日志的最大的下标说明跟随者缺失日志,拒绝附加日志
  • 如果preLog的任期和preLogIndex处的任期和preLogTerm不相等,那么说明日志存在conflict, 拒绝附加日志

追加到自身后,要记得进行将该log提交至chan中。也因此是应该追加的更新逻辑是,追加到自身rf.logs中后你需要先更新自身的commitIndex至追加后的长度,但是这时还没apply, lastApplied应该是真正提交到chan的下标,等commit更新后去提交到chan里再更新apply。这也是容易混乱的点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// AppendEntries 建立心跳、同步日志RPC
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
// 加锁
rf.mu.Lock()
defer rf.mu.Unlock()

// 节点crash
if rf.killed() {
reply.AppState = AppKilled
reply.Term = -1
reply.Success = false
return
}

// 出现网络分区,args的任期,比当前raft的任期还小,说明args之前所在的分区已经OutOfDate
if args.Term < rf.currentTerm {
reply.AppState = AppOutOfDate
reply.Term = rf.currentTerm
reply.Success = false
return
}

// 出现conflict的情况
// paper:Reply false if log doesn’t contain an entry at prevLogIndex,whose term matches prevLogTerm (§5.3)
// 首先要保证自身len(rf)大于0否则数组越界
// 1、 如果preLogIndex的大于当前日志的最大的下标说明跟随者缺失日志,拒绝附加日志
// 2、 如果preLog出`的任期和preLogIndex处的任期和preLogTerm不相等,那么说明日志存在conflict,拒绝附加日志
if args.PrevLogIndex > 0 && (len(rf.logs) < args.PrevLogIndex || rf.logs[args.PrevLogIndex-1].Term != args.PrevLogTerm) {
reply.AppState = Mismatch
reply.Term = rf.currentTerm
reply.Success = false
reply.UpNextIndex = rf.lastApplied + 1
return
}

// 如果当前节点提交的Index比传过来的还高,说明当前节点的日志已经超前,需返回过去
if args.PrevLogIndex != -1 && rf.lastApplied > args.PrevLogIndex {
reply.AppState = AppCommitted
reply.Term = rf.currentTerm
reply.Success = false
reply.UpNextIndex = rf.lastApplied + 1
return
}

// 对当前的rf进行ticker重置
rf.currentTerm = args.Term
rf.votedFor = args.LeaderId
rf.status = Follower
rf.timer.Reset(rf.overtime)

// 对返回的reply进行赋值
reply.AppState = AppNormal
reply.Term = rf.currentTerm
reply.Success = true

// 如果存在日志包那么进行追加
if args.Entries != nil {
rf.logs = rf.logs[:args.PrevLogIndex]
rf.logs = append(rf.logs, args.Entries...)
}

// 将日志提交至与Leader相同
for rf.lastApplied < args.LeaderCommit {
rf.lastApplied++
applyMsg := ApplyMsg{
CommandValid: true,
CommandIndex: rf.lastApplied,
Command: rf.logs[rf.lastApplied-1].Command,
}
rf.applyChan <- applyMsg
rf.commitIndex = rf.lastApplied
}

return
}

3.4 sendAppendEntries新增三种状态处理

对于一轮发送的日志请求,与vote一致,超过半数节点就可以更新自身的commitIndex,并apply至客户端,对于一些返回的任期比自己大的reply那么说明肯定是经历了网络分区,需要进行投票重排。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 目标节点正常返回
case AppNormal:
{
// 2A的test目的是让Leader能不能连续任期,所以2A只需要对节点初始化然后返回就好
// 2B需要判断返回的节点是否超过半数commit,才能将自身commit
if reply.Success && reply.Term == rf.currentTerm && *appendNums <= len(rf.peers)/2 {
*appendNums++
}

// 说明返回的值已经大过了自身数组
if rf.nextIndex[server] > len(rf.logs)+1 {
return
}

rf.nextIndex[server] += len(args.Entries)
if *appendNums > len(rf.peers)/2 {
// 保证幂等性,不会提交第二次
*appendNums = 0

if len(rf.logs) == 0 || rf.logs[len(rf.logs)-1].Term != rf.currentTerm {
return
}

for rf.lastApplied < len(rf.logs) {
rf.lastApplied++
applyMsg := ApplyMsg{
CommandValid: true,
Command: rf.logs[rf.lastApplied-1].Command,
CommandIndex: rf.lastApplied,
}
rf.applyChan <- applyMsg
rf.commitIndex = rf.lastApplied
}
}
return
}

case Mismatch:
if args.Term != rf.currentTerm {
return
}
rf.nextIndex[server] = reply.UpNextIndex

case AppCommitted:
if args.Term != rf.currentTerm {
return
}
rf.nextIndex[server] = reply.UpNextIndex
}

运行结果

图为测试程序运行结果

难度非常大的一节,代码量稍微上去了一些,主要难点在于排错…