mit6.5840 lab2A:Raft领导选举

本文最后更新于:6 个月前

mit6.5840 lab2A:Raft领导选举

写在前面

一晃快五月了,加快进度。raft和lab1的MapReduce是两个不同的框架,其优点在于能够提高容错性,具体可以参考论文以及B站的视频加深理解。这一节主要实现领导选举。

正文

1 定义消息结构体

对照着论文写一下结构体结构。

1.1 Raft结点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Raft
// A Go object implementing a single Raft peer.
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()

// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.

// 所有的servers拥有的变量:
currentTerm int // 记录当前的任期
votedFor int // 记录当前的任期把票投给了谁
logs []LogEntry // 日志条目数组,包含了状态机要执行的指令集,以及收到领导时的任期号

// 所有的servers经常修改的:
// 正常情况下commitIndex与lastApplied应该是一样的,但是如果有一个新的提交,并且还未应用的话last应该要更小些
commitIndex int // 状态机中已知的被提交的日志条目的索引值(初始化为0,持续递增)
lastApplied int // 最后一个被追加到状态机日志的索引值

// leader拥有的可见变量,用来管理他的follower(leader经常修改的)
// nextIndex与matchIndex初始化长度应该为len(peers),Leader对于每个Follower都记录他的nextIndex和matchIndex
// nextIndex指的是下一个的appendEntries要从哪里开始
// matchIndex指的是已知的某follower的log与leader的log最大匹配到第几个Index,已经apply
nextIndex []int // 对于每一个server,需要发送给他下一个日志条目的索引值(初始化为leader日志index+1,那么范围就对标len)
matchIndex []int // 对于每一个server,已经复制给该server的最后日志条目下标

// 由自己追加的:
status Status // 该节点是什么角色(状态)
overtime time.Duration // 设置超时时间
timer *time.Ticker // 每个节点中的计时器

applyChan chan ApplyMsg // 日志都是存在这里client取(2B)
}

1.2 RequestVote:竞选者和跟随者的通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// RequestVoteArgs
// example RequestVote RPC arguments structure.
// field names must start with capital letters!
type RequestVoteArgs struct {
// Your data here (2A, 2B).
Term int // 需要竞选的人的任期
CandidateId int // 需要竞选的人的Id
LastLogIndex int // 竞选人日志条目最后索引
LastLogTerm int // 候选人最后日志条目的任期号
}

// RequestVoteReply
// example RequestVote RPC reply structure.
// field names must start with capital letters!
// 如果竞选者任期比自己的任期还短,那就不投票,返回false
// 如果当前节点的votedFor为空,且竞选者的日志条目跟收到者的一样新则把票投给该竞选者
type RequestVoteReply struct {
// Your data here (2A).
Term int // 投票方的term,如果竞选者比自己还低就改为这个
VoteGranted bool // 是否投票给了该竞选人
VoteState VoteState // 投票状态
}

1.3 AppendEntries:领导者和跟随者的通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// AppendEntriesArgs 由leader复制log条目,也可以当做是心跳连接,注释中的rf为leader节点
type AppendEntriesArgs struct {
Term int // leader的任期
LeaderId int // leader自身的ID
PrevLogIndex int // 预计要从哪里追加的index,因此每次要比当前的len(logs)多1 args初始化为:rf.nextIndex[i] - 1
PrevLogTerm int // 追加新的日志的任期号(这边传的应该都是当前leader的任期号 args初始化为:rf.currentTerm
Entries []LogEntry // 预计存储的日志(为空时就是心跳连接)
LeaderCommit int // leader的commit index指的是最后一个被大多数机器都复制的日志Index
}

type AppendEntriesReply struct {
Term int // leader的term可能是过时的,此时收到的Term用于更新他自己
Success bool // 如果follower与Args中的PreLogIndex/PreLogTerm都匹配才会接过去新的日志(追加),不匹配直接返回false
AppState AppendEntriesState // 追加状态
}

2 Make初始化节点

注意在初始化raft后要起一个协程开启心跳机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me

// Your initialization code here (2A, 2B, 2C).

// 对应论文中的初始化
rf.applyChan = applyCh //2B

rf.currentTerm = 0
rf.votedFor = -1
rf.logs = make([]LogEntry, 0)

rf.commitIndex = 0
rf.lastApplied = 0

rf.nextIndex = make([]int, len(peers))
rf.matchIndex = make([]int, len(peers))

rf.status = Follower
rf.overtime = time.Duration(150+rand.Intn(150)) * time.Millisecond // 随机产生150-300ms
rf.timer = time.NewTicker(rf.overtime)

// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())

// start ticker goroutine to start elections
go rf.ticker()

return rf
}

3 ticker心跳

要时刻监听是否发生超时,如果超时按当前节点身份做分情况讨论。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func (rf *Raft) ticker() {
for rf.killed() == false {
// Your code here to check if a leader election should
// be started and to randomize sleeping time using

// 当定时器结束进行超时选举
select {
// 监听到发生超时
case <-rf.timer.C:
if rf.killed() {
return
}
rf.mu.Lock()
// 根据自身的status进行一次ticker
switch rf.status {
// 跟随者变成候选者
case Follower:
rf.status = Candidate
fallthrough
// 候选者发送投票请求
case Candidate:
// 初始化自身的任期、并把票投给自己
rf.currentTerm += 1
rf.votedFor = rf.me
votedNums := 1 // 统计自身的票数

// 每轮选举开始时,重新设置选举超时
rf.overtime = time.Duration(150+rand.Intn(150)) * time.Millisecond // 随机产生150-300ms
rf.timer.Reset(rf.overtime)

// 对自身以外的节点进行选举
for i := 0; i < len(rf.peers); i++ {
// 遍历到自己本身 跳过
if i == rf.me {
continue
}
// 初始化RequestVote消息结构体
voteArgs := RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: len(rf.logs) - 1,
LastLogTerm: 0,
}
// 如果日志数组里面有内容
// 将任期号同步
if len(rf.logs) > 0 {
voteArgs.LastLogTerm = rf.logs[len(rf.logs)-1].Term
}
// 定义reply消息结构体
voteReply := RequestVoteReply{}
// 起线程发送request
go rf.sendRequestVote(i, &voteArgs, &voteReply, &votedNums)
}
// 领导者发送日志同步
case Leader:
appendNums := 1 // 对于正确返回的节点数量
rf.timer.Reset(HeartBeatTimeout)
// 对自身以外的节点发送心跳同步
for i := 0; i < len(rf.peers); i++ {
// 遍历到自己本身 跳过
if i == rf.me {
continue
}
// 初始化appendEntries消息结构体
appendEntriesArgs := AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: 0,
PrevLogTerm: 0,
Entries: nil,
LeaderCommit: rf.commitIndex,
}
// 定义reply消息结构体
appendEntriesReply := AppendEntriesReply{}
// 起线程发送request
go rf.sendAppendEntries(i, &appendEntriesArgs, &appendEntriesReply, &appendNums)
}
}
rf.mu.Unlock()
}
}
}

4 RequestVote:候选者和其他节点通信的两个函数

4.1 sendRequestVote:候选者向其他节点发送请求投票消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply, voteNums *int) bool {
// 如果节点已经删除,返回false
if rf.killed() {
return false
}

// 调Call方法发送消息
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
for !ok {
// 失败重传
if rf.killed() {
return false
}
ok = rf.peers[server].Call("Raft.RequestVote", args, reply)
}

// 加锁
rf.mu.Lock()
defer rf.mu.Unlock()

// 由于网络分区,会导致投票的人的term的比自己的还小,不给予投票
if args.Term < rf.currentTerm {
return false
}

// 对reply的返回情况进行分支处理
switch reply.VoteState {

// 消息过期有两种情况:
// 1.是本身的term过期了比节点的还小
// 2.是节点日志的条目落后于节点了
case Expire:
{
rf.status = Follower
rf.timer.Reset(rf.overtime)
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.votedFor = -1
}
}

case Normal, Voted:
// 根据是否同意投票,收集选票数量
if reply.VoteGranted && reply.Term == rf.currentTerm && *voteNums <= (len(rf.peers)/2) {
*voteNums++
}

// 票数超过一半
if *voteNums >= (len(rf.peers)/2)+1 {
*voteNums = 0
// 本身就是leader在网络分区中更具有话语权的leader
if rf.status == Leader {
return ok
}

// 本身不是leader,那么需要初始化nextIndex数组
rf.status = Leader
rf.nextIndex = make([]int, len(rf.peers))
for i, _ := range rf.nextIndex {
rf.nextIndex[i] = len(rf.logs) + 1
}

rf.timer.Reset(HeartBeatTimeout)
}

case Killed:
return false
}

return ok
}

4.2 RequestVote:收到投票请求的节点向候选者传回reply

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
// 加锁
rf.mu.Lock()
defer rf.mu.Unlock()

// 当前节点crash
if rf.killed() {
reply.VoteState = Killed
reply.Term = -1
reply.VoteGranted = false
return
}

// 出现网络分区,该竞选者已经OutOfDate(过时)
if args.Term < rf.currentTerm {
reply.VoteState = Expire
reply.Term = rf.currentTerm
reply.VoteGranted = false
return
}

// 自身相对于竞选者过时,先重置自身状态
if args.Term > rf.currentTerm {
rf.status = Follower
rf.currentTerm = args.Term
rf.votedFor = -1
}

// 此时比自己任期小的都已经把票还原
if rf.votedFor == -1 {
currentLogIndex := len(rf.logs) - 1
currentLogTerm := 0
// 如果currentLogIndex下标不是-1就把term赋值过来
if currentLogIndex >= 0 {
currentLogTerm = rf.logs[currentLogIndex].Term
}

// 论文里的第二个匹配条件,当前peer要符合arg两个参数的预期
if args.LastLogIndex < currentLogIndex || args.LastLogTerm < currentLogTerm {
reply.VoteState = Expire
reply.VoteGranted = false
reply.Term = rf.currentTerm
return
}

// 给票数,并且返回true
rf.votedFor = args.CandidateId
reply.VoteState = Normal
reply.Term = rf.currentTerm
reply.VoteGranted = true

} else { // 只剩下任期相同,但是票已经给了,此时存在两种情况
// 初始化出来reply,去除代码冗余
reply.VoteState = Voted
reply.VoteGranted = false

// 1、当前的节点是来自同一轮,不同竞选者的,但是票数已经给了(又或者它本身自己就是竞选者)
if rf.votedFor != args.CandidateId {
// 告诉reply票已经没了返回false
return
} else { // 2. 当前的节点票已经给了同一个人了,但是由于sleep等网络原因,又发送了一次请求
// 重置自身状态
rf.status = Follower
}

}
// 重置时间
rf.timer.Reset(rf.overtime)
return
}

5 AppendEntries:领导者和跟随者通信的两个函数

5.1 sendAppendEntries:领导者向其他节点发送日志消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// leader向其他节点发送日志消息
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply, appendNums *int) bool {
// 如果节点已经删除,返回false
if rf.killed() {
return false
}

// paper中5.3节第一段末尾提到,如果append失败应该不断的重传,直到这个log成功的被store
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
for !ok {
if rf.killed() {
return false
}
ok = rf.peers[server].Call("Raft.AppendEntries", args, reply)
}

// 必须在加在这里否则前面重传时,RPC也需要一个锁,但是又获取不到,因为锁已经被加上了
rf.mu.Lock()
defer rf.mu.Unlock()

// 对reply的返回状态进行分支
switch reply.AppState {

// 目标节点crash
case AppKilled:
{
return false
}

// 目标节点正常返回
case AppNormal:
{
// 2A的test目的是让Leader能不能连续任期,所以2A只需要对节点初始化然后返回就好
return true
}

// 出现网络分区,该Leader已经OutOfDate(过时)
case AppOutOfDate:
// 该节点变成追随者,并重置rf状态
rf.status = Follower
rf.votedFor = -1
rf.timer.Reset(rf.overtime)
rf.currentTerm = reply.Term

}

return ok
}

5.2 AppendEntries:收到同步请求的节点向领导者传回reply

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// AppendEntries 建立心跳、同步日志RPC
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
// 加锁
rf.mu.Lock()
defer rf.mu.Unlock()

// 节点crash
if rf.killed() {
reply.AppState = AppKilled
reply.Term = -1
reply.Success = false
return
}

// 出现网络分区,args的任期,比当前raft的任期还小,说明args之前所在的分区已经OutOfDate
if args.Term < rf.currentTerm {
reply.AppState = AppOutOfDate
reply.Term = rf.currentTerm
reply.Success = false
return
}

// 对当前的rf进行ticker重置
rf.currentTerm = args.Term
rf.votedFor = args.LeaderId
rf.status = Follower
rf.timer.Reset(rf.overtime)

// 对返回的reply进行赋值
reply.AppState = AppNormal
reply.Term = rf.currentTerm
reply.Success = true
return
}

6 GetState:返回当前节点状态

测试程序会调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (rf *Raft) GetState() (int, bool) {
// 加锁
rf.mu.Lock()
defer rf.mu.Unlock()

// 返回值
var term int // 当前节点term
var isleader bool // 是否是leader

// Your code here (2A).
term = rf.currentTerm
if rf.status == Leader {
isleader = true
} else {
isleader = false
}
return term, isleader
}

运行结果

图为测试程序运行结果