MIT 6.824 Raft
Lab2A Leader election
首先选举出一个leader,让这个leader来管理冗余日志。leader从client接受日志条目,在其他服务器上做冗余备份,并告知其他服务器一个安全的时间点将这些日志应用于他们自己的状态机
leader必须接受其他的log entries 从client,并且做好备份
safety:在一个log的index上(多台server的),只能有一种日志entries
5.1 Raft基础
开始的时候都是Follower,timeout后开始选举,变成candidate;1、在timeout后没有接到多数选票,再次进行选举;2、如果接受了多数的选票,变成leader
时间被分割为term,每个term被开始为选举期,如果没有leader被选举出来(split vote),term结束。
每一个server保存一个当前term的数字(随时间线性增长)。不论何时server在沟通的时候,当前的term会被交换。candidtae和leader发现自己的term过时的时候,就会转变为follower。如果一个server收到了一个包含过时的term number的request,它会忽略。
5.2 Leader Election
在开始的时候,一个server是follower,当收到leader或者candidate的RPC时,它会一直保持这个状态。Leader会发送定时的heartbeat信息(不含log entries的AppendEntries RPC调用)给所有followers,以保持他们的属性状态。
如果一个follower在election timeout的时间内没有收到任何交流信息,它就开始选举leader
在选举的开始,follower增加它的term number,并且转换成candidate状态。它给自己投票,并且向集群中的其他server发送RequestVote RPC调用。 以下三种状况中,candidate转换自己的状态:
- 赢得了选举
- 另一个server赢得了选举
- 没有winner 以下是三种情况的详细说明:
- 赢得选举 <=> 在同一个term中,获取集群中的大多数选票。 一个server最多只能在一个term中投票一个candidate(以FCFS的方式投票),这个机制保证了在一个term中,最多只有一个优胜者。当一个candidate赢得选举后,发送heartbeat信息来终止新的选举。
- 在等待选票的过程中,如果收到了其他server的AppendEntries RPC(即他也想当leader),根据term number的大小决定是否变回follower,若调用来自term较大(>= 当前的candidate term)的,则变回follower
- 大家都没有获得足够的选票,那么每个candidate都timeout然后进行新一轮的选举,并增加自己的term number
raft使用随机的time-out长度来避免split vote
Raft节点数据结构
// Role is the server's role
type Role int
const (
FOLLOWER = 0
CANDIDATE = 1
LEADER = 2
)
type LogEntry struct {
Command interface{}
Term int // 这条logEntry是在term时期被写入的
}
//
// A Go object implementing a single Raft peer.
//
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
currentTerm int
votedFor int // vote for the candidate id
log []LogEntry // log entries
role Role
commitIndex int // index of log entry of the highest
lastApplied int
nextIndex []int // 对于每个server,需要发送的日志索引
matchIndex []int // 对每个server,已知的已经被冗余了的日志最高索引
// channel for communication
chAppendEntry chan struct{} // 用于通信的管道, receive the heartbeat
chRequestVote chan struct{} // channel, receive the request vote
}
// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {
var term int
var isleader bool
// Your code here (2A).
rf.mu.Lock()
term = rf.currentTerm
isleader = (rf.role == LEADER)
rf.mu.Unlock()
return term, isleader
}
GetState函数
// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {
var term int
var isleader bool
// Your code here (2A).
rf.mu.Lock()
term = rf.currentTerm
isleader = (rf.role == LEADER)
rf.mu.Unlock()
return term, isleader
}
Make函数启动一个Raft节点
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
// Your initialization code here (2A, 2B, 2C).
rf.currentTerm = 0
rf.votedFor = -1
rf.log = make([]LogEntry, 16)
rf.log[0] = LogEntry{struct{}{}, -1}
rf.commitIndex = 0
rf.lastApplied = 0
rf.matchIndex = make([]int, len(rf.peers))
rf.nextIndex = make([]int, len(rf.peers))
rf.chAppendEntry = make(chan struct{})
rf.chRequestVote = make(chan struct{})
rf.role = FOLLOWER
// 开启服务
go rf.startServing()
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
return rf
}
主要逻辑函数startServing
func (rf *Raft) startServing() {
for {
if rf.killed() {
return
}
switch rf.role {
case FOLLOWER:
rf.following()
case CANDIDATE:
rf.election()
case LEADER:
rf.sendingHeartBeat()
}
}
}
func (rf *Raft) changeRole(role Role) {
rf.mu.Lock()
defer rf.mu.Unlock()
rf.role = role
}
作为Follower的处理逻辑
func (rf *Raft) following() {
rand.Seed(time.Now().Unix())
electionTimeout := rand.Intn(1000) + 400 // in the thesis use 150~300ms, but here use 500~1000ms
timer := time.NewTimer(time.Millisecond * time.Duration(electionTimeout))
for {
if rf.killed() {
return
}
select {
case <-timer.C:
DPrintf("%d: Eletion Timeout, start election\n", rf.me)
rf.changeRole(CANDIDATE)
return
case <-rf.chAppendEntry:
electionTimeout := rand.Intn(1000) + 400
timer.Reset(time.Millisecond * time.Duration(electionTimeout))
case <-rf.chRequestVote:
DPrintf("%d: recieve the candidates' request for vote\n", rf.me)
electionTimeout := rand.Intn(1000) + 400
timer.Reset(time.Millisecond * time.Duration(electionTimeout))
}
}
}
作为Candidate的处理逻辑
func (rf *Raft) election() {
rf.mu.Lock()
rf.currentTerm++
rf.votedFor = rf.me
rf.mu.Unlock()
args := RequestVoteArgs{}
args.CandidateID = rf.me
args.Term = rf.currentTerm
args.LastLogIndex = len(rf.log) - 1
args.LastLogTerm = rf.log[args.LastLogIndex].Term
voteCount := 0
voteOk := make(chan struct{})
var voteMu sync.Mutex
var notified bool = false // 是否已经发送给voteOk信号
timer := time.NewTimer(time.Millisecond * time.Duration(rand.Intn(300)+500))
for serverID := range(rf.peers) {
if serverID != rf.me {
go func(server int) {
reply := RequestVoteReply{}
if ok := rf.sendRequestVote(server, &args, &reply); ok {
if reply.VoteGranted {
voteMu.Lock()
voteCount++
if !notified && voteCount >= len(rf.peers) / 2 {
voteOk <- struct{}{}
notified = true // 其他的选票就不再通知了
voteMu.Unlock()
} else {
voteMu.Unlock()
}
}
}
}(serverID)
}
}
select {
case <-voteOk:
DPrintf("%d: Become Leader", rf.me)
rf.changeRole(LEADER)
rf.votedFor = -1
return
case <-rf.chAppendEntry: // 当有其他的leader已经被选举出来后
rf.changeRole(FOLLOWER)
rf.votedFor = -1
return
case <-rf.chRequestVote: // 收到其他candidate的合法选票
rf.changeRole(FOLLOWER)
rf.votedFor = -1
DPrintf("Receive others' requestVote")
return
case <-timer.C: // 到时间了还没有选票
rf.changeRole(FOLLOWER)
rf.votedFor = -1
return
}
}
作为Master的处理逻辑
func (rf *Raft) sendingHeartBeat() {
rf.mu.Lock()
lastLogIndex := len(rf.log) - 1
for i := range(rf.peers) {
rf.nextIndex[i] = lastLogIndex + 1
rf.matchIndex[i] = 0
}
args := AppendEntriesArgs{}
args.Term = rf.currentTerm
args.LeaderID = rf.me
args.PrevLogIndex = lastLogIndex // index of log entry immediately preceding new ones
args.PrevLogTerm = rf.log[args.PrevLogIndex].Term
args.Entries = []LogEntry{} // empty log entries
args.LeaderCommit = rf.commitIndex
rf.mu.Unlock()
timer := time.NewTimer(time.Duration(time.Millisecond * 200)) // 间隔200ms发出心跳
higherTermCh := make(chan struct{})
gotHigherTerm := false // 是否收到了高Term的回复
for {
for serverID := range(rf.peers) {
if serverID == rf.me {
continue
}
go func(server int) {
reply := AppendEntriesReply{}
if ok := rf.sendAppendEntry(server, &args, &reply); ok {
rf.mu.Lock()
if !gotHigherTerm && reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
gotHigherTerm = true
higherTermCh <- struct{}{}
}
rf.mu.Unlock()
}
}(serverID)
}
select {
case <-timer.C:
timer.Reset(time.Duration(time.Millisecond * 200))
case <-higherTermCh: // discover a server with higher term
rf.changeRole(FOLLOWER)
return
case <-rf.chAppendEntry: // 如果接收到follower的高term AppendEntry
rf.changeRole(FOLLOWER)
return
case <-rf.chRequestVote: // 如果接收到candidate的高term RequestVote
rf.changeRole(FOLLOWER)
return
}
}
}
AppendEntries(心跳接受的RPC)
func (rf *Raft) AppendEntry(args *AppendEntriesArgs, reply *AppendEntriesReply) {
// follower 接收心跳处理
rf.mu.Lock()
defer rf.mu.Unlock()
if len(args.Entries) == 0 {
if args.Term < rf.currentTerm { // 收到的心跳Term太小
reply.Success = false
reply.Term = rf.currentTerm
} else {
DPrintf("%d: Got valid heartbeat from %d\n", rf.me, args.LeaderID)
rf.currentTerm = args.Term
reply.Success = true
go func(){
rf.chAppendEntry <- struct{}{}
}()
}
}
}
RequestVote(投票请求的处理)
type RequestVoteArgs struct {
// Your data here (2A, 2B).
Term int // candidate's term
CandidateID int // 暂时用rafts数组的下标作为id
LastLogIndex int // 最后一个日志条目的下标
LastLogTerm int
}
//
// example RequestVote RPC reply structure.
// field names must start with capital letters!
//
type RequestVoteReply struct {
// Your data here (2A).
Term int // currentTerm, for candidate to update itself
VoteGranted bool // true表示同意选举
}
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
defer rf.mu.Unlock()
if args.Term < rf.currentTerm {
reply.VoteGranted = false
reply.Term = rf.currentTerm
DPrintf("%d: refuse vote to %d\n", rf.me, args.CandidateID)
} else if rf.votedFor == -1 || rf.votedFor == args.CandidateID {
curLogIndex := len(rf.log) - 1
if args.LastLogIndex >= curLogIndex && args.LastLogTerm >= rf.log[curLogIndex].Term { // candidates的log至少和我一样新
DPrintf("%d: Vote to %d\n", rf.me, args.CandidateID)
reply.VoteGranted = true
reply.Term = args.Term
rf.currentTerm = args.Term
rf.votedFor = args.CandidateID
go func() {
rf.chRequestVote <- struct{}{}
}() // 通知自己收到了合法的请求投票
} else {
reply.VoteGranted = false
// reply.Term = rf.currentTerm
DPrintf("%d: refuese vote to %d, because the old log\n", rf.me, args.CandidateID)
}
} else {
reply.VoteGranted = false
// reply.Term = rf.currentTerm
DPrintf("%d: refuese vote to %d, because has voted->%d.\n", rf.me, args.CandidateID, rf.votedFor)
}
}
运行结果
➜ raft git:(master) ✗ go test -run 2A -race
Test (2A): initial election ...
... Passed -- 4.6 3 38 9128 0
Test (2A): election after network failure ...
... Passed -- 7.5 3 86 17588 0
PASS
ok _/Users/sjy/develop/Go/6.824/src/raft 12.486s
Lab2B Log replication
从测试出发进行理解(applyCh的意义:将日志应用到状态机上)
从TestBasicAgree2B 测试函数出发 -> make_config -> start1
start1函数中有将commit command 写入cfg的log中(从通道applyCh中接收新的command)
在TestBasicAgree2B 测试函数中 nCommitted函数会检查cfg的log,查看是否大多数节点已经agreement了
所以需要把start1中的applyCh传给当前的server节点,当有新的log entry commit后,通过applyCh发送相关消息
leader的nextIndex数组保存着要发给每个server的log index,初始化为leader 的最后一个log的下标+1(len(log))
leader的matchIndex保存着已经复制到某个server的log index,初始化为0;leader不断的检查matchIndex,来确定是否存在新的commitIndex
每个server都会不停得检查lastApplied Index,一但commitIndex大于lastApplied,当前Server会把这条日志通过applyCh应用到状态机里
日志复制的流程
client发送command -> leader
leader append to leader's log -> send RPC to follower -> follower append to log(Replicate)
-> majority follower agree -> leader apply the command to state machine(this log marked as committed)
-> follower know the log is committed -> follower apply the command to state machine
完整代码我放到gitee上了,下面只记录通过测试的时候的遇到的各种问题。
TestBasicAgree2B 中由于忘记更新AppendEntries RPC的Term,导致一直不同步
TestFailAgree2B 中由于脱离连接的节点不停得ElectionTimeout,所以Term一直增加,Leader会收到对AppendEntries拒绝的reply,因为Term的原因
所以Leader在收到高Term的AppendEntries Reply后,需要step down,重新选举,更新term
TestFailNoAgree2B Passed (但是会出现conflictIndex=-1的情况,解决:在Leader的Term中,不要立即使用rf.currentTerm,使用之前保存的LeaderCurrentTerm,防止被RPC篡改)
问题,出现类似死锁
2021/05/20 12:28:41 Leader 3:Update Server 2 matchIndex = 2, nextIndex = 3
2021/05/20 12:28:41 4: Eletion Timeout, start election
2021/05/20 12:28:41 1: Eletion Timeout, start election
2021/05/20 12:28:41 0: Eletion Timeout, start election
2021/05/20 12:28:42 4: Eletion Timeout, start election
2021/05/20 12:28:43 Follower 4: invalid AppendEntry RPC, args.Term=1, rf.currentTerm=3
2021/05/20 12:28:43 Follower 1: invalid AppendEntry RPC, args.Term=1, rf.currentTerm=2
2021/05/20 12:28:43 Follower 0: invalid AppendEntry RPC, args.Term=1, rf.currentTerm=2
2021/05/20 12:28:43 Leader 3: Saw term 2, step down
2021/05/20 12:28:43 1: Eletion Timeout, start election
2021/05/20 12:28:43 2: refuese vote to 1, because has voted->3.
2021/05/20 12:28:43 4: refuese vote to 1, because has voted->4.
2021/05/20 12:28:43 0: Vote to 1
2021/05/20 12:28:43 0: recieve the candidates' request for vote
2021/05/20 12:28:44 1: Eletion Timeout, start election
2021/05/20 12:28:44 0: Vote to 1
2021/05/20 12:28:44 4: Vote to 1
2021/05/20 12:28:44 2: refuese vote to 1, because has voted->3.
2021/05/20 12:28:44 0: recieve the candidates' request for vote
2021/05/20 12:28:44 4: recieve the candidates' request for vote
2021/05/20 12:28:44 1: Become Leader
把心跳接收到高Term的回复后的通知管道改为有缓冲的了,同时比较Term的时候用leader的term缓存来比较
TestConcurrentStarts2B Failed
貌似出现死锁,其实是matchIndex一直不更新,导致无法Leader无法跟新commitIndex
原因:nextIndex的更新方式
if reply.Success {
// rf.matchIndex[server] = rf.nextIndex[server]
rf.nextIndex[server] += len(args.Entries)
rf.matchIndex[server] = rf.nextIndex[server] - 1
// ...
}
TestRejoin2B Failed
在两个日志不一样的server选举的时候出现问题,无法选出leader
修改RequestVote RPC handler的投票同意条件(注意其实这里错了,log的term相等的时候不能保证是一样新的,所以此处的<=应该是<)
if rf.log[curLogIndex].Term <= args.LastLogTerm || (args.LastLogTerm == rf.log[curLogIndex].Term && args.LastLogIndex >= curLogIndex) { } // candidates的log至少和我一样新
遇到问题:在TestFailAgree2B中,发现会仅仅根据term投票给candidate,导致测试不通过
再次修改投票同意条件,只有args.LastLogTerm比当前节点大的时候才认为比当前的log新
curLogIndex := len(rf.log) - 1
if rf.log[curLogIndex].Term < args.LastLogTerm || (args.LastLogTerm == rf.log[curLogIndex].Term && args.LastLogIndex >= curLogIndex) {} // candidates的log至少和我一样新
TestBackup2B Failed
发现包含冲突的日志的3个server不能选举出leader,因为term的不匹配
原因:在选举时忘记更新term
go func(server int) {
reply := RequestVoteReply{}
if ok := rf.sendRequestVote(server, &args, &reply); ok {
if reply.VoteGranted {
voteMu.Lock()
voteCount++
if !notified && voteCount >= len(rf.peers) / 2 {
voteOk <- struct{}{}
notified = true // 其他的选票就不再通知了
voteMu.Unlock()
} else {
voteMu.Unlock()
}
} else {
if reply.Term > leaderCurrentTerm {
rf.mu.Lock()
rf.currentTerm = reply.Term
rf.mu.Unlock()
}
}
}
}(serverID)
最终通过所有的测试:
$ time go test -run 2B
Test (2B): basic agreement ...
... Passed -- 1.6 3 16 4434 3
Test (2B): RPC byte count ...
... Passed -- 4.9 3 48 114046 11
Test (2B): agreement despite follower disconnection ...
... Passed -- 9.0 3 78 19452 8
Test (2B): no agreement if too many followers disconnect ...
... Passed -- 5.4 5 116 26368 4
Test (2B): concurrent Start()s ...
... Passed -- 2.2 3 14 3906 6
Test (2B): rejoin of partitioned leader ...
... Passed -- 8.1 3 120 28561 4
Test (2B): leader backs up quickly over incorrect follower logs ...
... Passed -- 50.0 5 1968 1587150 102
Test (2B): RPC counts aren't too high ...
... Passed -- 2.8 3 26 7518 12
PASS
ok _/Users/sjy/develop/Go/6.824/src/raft 84.752s
go test -run 2B 2.30s user 1.47s system 4% cpu 1:25.09 total
Lab2C Log Persistence
将raft节点的部分信息持久化到“disk”上(其实是persister)
在labgob的使用中,需要注册持久化数据的基本数据类型,由于我的日志条目的第一条内容是个空的struct,所以需要注册这个类型。
虽然通过了2B,但还有情况选不出来leader,在测试2C的过程中发现了。参考了下大佬的代码,发现stepDown函数的妙用。主要是在接受心跳回复和投票回复时,进行处理判断,如果收到了高Term的回复,直接退回到follower状态,不再处理。
部分函数修改后,通过了测试,代码在这里
$ go test -run 2C
Test (2C): basic persistence ...
labgob warning: Decoding into a non-default variable/field int may not work
... Passed -- 6.5 3 182 37778 6
Test (2C): more persistence ...
... Passed -- 23.5 5 652 139412 16
Test (2C): partitioned leader and one follower crash, leader restarts ...
... Passed -- 4.2 3 52 11504 4
Test (2C): Figure 8 ...
... Passed -- 40.3 5 13452 2786455 15
Test (2C): unreliable agreement ...
... Passed -- 11.0 5 216 72284 246
Test (2C): Figure 8 (unreliable) ...
... Passed -- 40.8 5 1728 5864070 933
Test (2C): churn ...
... Passed -- 16.3 5 700 441128 130
Test (2C): unreliable churn ...
... Passed -- 16.5 5 332 117614 123
PASS
ok _/6.824/src/raft 159.471s