MIT 6.824 Raft

Lab2A Leader election

首先选举出一个leader,让这个leader来管理冗余日志。leader从client接受日志条目,在其他服务器上做冗余备份,并告知其他服务器一个安全的时间点将这些日志应用于他们自己的状态机

leader必须接受其他的log entries 从client,并且做好备份

safety:在一个log的index上(多台server的),只能有一种日志entries

5.1 Raft基础

开始的时候都是Follower,timeout后开始选举,变成candidate;1、在timeout后没有接到多数选票,再次进行选举;2、如果接受了多数的选票,变成leader

时间被分割为term,每个term被开始为选举期,如果没有leader被选举出来(split vote),term结束。

每一个server保存一个当前term的数字(随时间线性增长)。不论何时server在沟通的时候,当前的term会被交换。candidtae和leader发现自己的term过时的时候,就会转变为follower。如果一个server收到了一个包含过时的term number的request,它会忽略。

5.2 Leader Election

在开始的时候,一个server是follower,当收到leader或者candidate的RPC时,它会一直保持这个状态。Leader会发送定时的heartbeat信息(不含log entries的AppendEntries RPC调用)给所有followers,以保持他们的属性状态。

如果一个follower在election timeout的时间内没有收到任何交流信息,它就开始选举leader

在选举的开始,follower增加它的term number,并且转换成candidate状态。它给自己投票,并且向集群中的其他server发送RequestVote RPC调用。 以下三种状况中,candidate转换自己的状态:

  1. 赢得了选举
  2. 另一个server赢得了选举
  3. 没有winner 以下是三种情况的详细说明:
  4. 赢得选举 <=> 在同一个term中,获取集群中的大多数选票。 一个server最多只能在一个term中投票一个candidate(以FCFS的方式投票),这个机制保证了在一个term中,最多只有一个优胜者。当一个candidate赢得选举后,发送heartbeat信息来终止新的选举。
  5. 在等待选票的过程中,如果收到了其他server的AppendEntries RPC(即他也想当leader),根据term number的大小决定是否变回follower,若调用来自term较大(>= 当前的candidate term)的,则变回follower
  6. 大家都没有获得足够的选票,那么每个candidate都timeout然后进行新一轮的选举,并增加自己的term number

raft使用随机的time-out长度来避免split vote

Raft节点数据结构

// Role is the server's role
type Role int

const (
	FOLLOWER  = 0
	CANDIDATE = 1
	LEADER    = 2
)

type LogEntry struct {
	Command interface{}
	Term    int // 这条logEntry是在term时期被写入的
}

//
// A Go object implementing a single Raft peer.
//
type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Your data here (2A, 2B, 2C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.
	currentTerm int
	votedFor    int        // vote for the candidate id
	log         []LogEntry // log entries

	role Role

	commitIndex int // index of log entry of the highest
	lastApplied int

	nextIndex  []int // 对于每个server,需要发送的日志索引
	matchIndex []int // 对每个server,已知的已经被冗余了的日志最高索引

	// channel for communication
	chAppendEntry chan struct{} // 用于通信的管道, receive the heartbeat
	chRequestVote chan struct{} // channel, receive the request vote
}

// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {

	var term int
	var isleader bool
	// Your code here (2A).
	rf.mu.Lock()
	term = rf.currentTerm
	isleader = (rf.role == LEADER)
	rf.mu.Unlock()
	return term, isleader
}

GetState函数

// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {

	var term int
	var isleader bool
	// Your code here (2A).
	rf.mu.Lock()
	term = rf.currentTerm
	isleader = (rf.role == LEADER)
	rf.mu.Unlock()
	return term, isleader
}

Make函数启动一个Raft节点

func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{}
	rf.peers = peers
	rf.persister = persister
	rf.me = me

	// Your initialization code here (2A, 2B, 2C).
	rf.currentTerm = 0
	rf.votedFor = -1
	rf.log = make([]LogEntry, 16)
	rf.log[0] = LogEntry{struct{}{}, -1}
	rf.commitIndex = 0
	rf.lastApplied = 0
	rf.matchIndex = make([]int, len(rf.peers))
	rf.nextIndex = make([]int, len(rf.peers))

	rf.chAppendEntry = make(chan struct{})
	rf.chRequestVote = make(chan struct{})
	rf.role = FOLLOWER

	// 开启服务
	go rf.startServing()

	// initialize from state persisted before a crash
	rf.readPersist(persister.ReadRaftState())

	return rf
}

主要逻辑函数startServing

func (rf *Raft) startServing() {
	for {
		if rf.killed() {
			return
		}
		switch rf.role {
		case FOLLOWER:
			rf.following()
		case CANDIDATE:
			rf.election()
		case LEADER:
			rf.sendingHeartBeat()
		}
	}
}

func (rf *Raft) changeRole(role Role) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	rf.role = role
}

作为Follower的处理逻辑

func (rf *Raft) following() {
	rand.Seed(time.Now().Unix())
	electionTimeout := rand.Intn(1000) + 400 // in the thesis use 150~300ms, but here use 500~1000ms
	timer := time.NewTimer(time.Millisecond * time.Duration(electionTimeout))
	for {
		if rf.killed() {
			return
		}
		select {
		case <-timer.C:
			DPrintf("%d: Eletion Timeout, start election\n", rf.me)
			rf.changeRole(CANDIDATE)
			return
		case <-rf.chAppendEntry:
			electionTimeout := rand.Intn(1000) + 400
			timer.Reset(time.Millisecond * time.Duration(electionTimeout))
		case <-rf.chRequestVote:
			DPrintf("%d: recieve the candidates' request for vote\n", rf.me)
			electionTimeout := rand.Intn(1000) + 400
			timer.Reset(time.Millisecond * time.Duration(electionTimeout))
		}
	}
}

作为Candidate的处理逻辑

func (rf *Raft) election() {
	rf.mu.Lock()
	rf.currentTerm++
	rf.votedFor = rf.me
	rf.mu.Unlock()

	args := RequestVoteArgs{}
	args.CandidateID = rf.me
	args.Term = rf.currentTerm
	args.LastLogIndex = len(rf.log) - 1
	args.LastLogTerm = rf.log[args.LastLogIndex].Term

	voteCount := 0
	voteOk := make(chan struct{})
	var voteMu sync.Mutex
	var notified bool = false  // 是否已经发送给voteOk信号

	timer := time.NewTimer(time.Millisecond * time.Duration(rand.Intn(300)+500))
	for serverID := range(rf.peers) {
		if serverID != rf.me {
			go func(server int) {
				reply := RequestVoteReply{}
				if ok := rf.sendRequestVote(server, &args, &reply); ok {
					if reply.VoteGranted {
						voteMu.Lock()
						voteCount++
						if !notified && voteCount >= len(rf.peers) / 2 {
							voteOk <- struct{}{}
							notified = true  // 其他的选票就不再通知了
							voteMu.Unlock()
						} else {
							voteMu.Unlock()
						}
					}
				}
			}(serverID)
		}
	}
	select {
	case <-voteOk:
		DPrintf("%d: Become Leader", rf.me)
		rf.changeRole(LEADER)
		rf.votedFor = -1
		return
	case <-rf.chAppendEntry: // 当有其他的leader已经被选举出来后
		rf.changeRole(FOLLOWER)
		rf.votedFor = -1
		return
	case <-rf.chRequestVote: // 收到其他candidate的合法选票
		rf.changeRole(FOLLOWER)
		rf.votedFor = -1
		DPrintf("Receive others' requestVote")
		return
	case <-timer.C: // 到时间了还没有选票
		rf.changeRole(FOLLOWER)
		rf.votedFor = -1
		return
	}
}

作为Master的处理逻辑

func (rf *Raft) sendingHeartBeat() {
	rf.mu.Lock()
	lastLogIndex := len(rf.log) - 1
	for i := range(rf.peers) {
		rf.nextIndex[i] = lastLogIndex + 1
		rf.matchIndex[i] = 0
	}

	args := AppendEntriesArgs{}
	args.Term = rf.currentTerm
	args.LeaderID = rf.me
	args.PrevLogIndex = lastLogIndex // index of log entry immediately preceding new ones
	args.PrevLogTerm = rf.log[args.PrevLogIndex].Term
	args.Entries = []LogEntry{} // empty log entries
	args.LeaderCommit = rf.commitIndex
	rf.mu.Unlock()

	timer := time.NewTimer(time.Duration(time.Millisecond * 200)) // 间隔200ms发出心跳
	higherTermCh := make(chan struct{})
	gotHigherTerm := false  // 是否收到了高Term的回复
	
	for {
		for serverID := range(rf.peers) {
			if serverID == rf.me {
				continue
			}
			go func(server int) {
				reply := AppendEntriesReply{}
				if ok := rf.sendAppendEntry(server, &args, &reply); ok {
					rf.mu.Lock()
					if !gotHigherTerm && reply.Term > rf.currentTerm {
						rf.currentTerm = reply.Term
						gotHigherTerm = true
						higherTermCh <- struct{}{}
					}
					rf.mu.Unlock()
				}
			}(serverID)
		}

		select {
		case <-timer.C:
			timer.Reset(time.Duration(time.Millisecond * 200))
		case <-higherTermCh: // discover a server with higher term
			rf.changeRole(FOLLOWER)
			return
		case <-rf.chAppendEntry: // 如果接收到follower的高term AppendEntry
			rf.changeRole(FOLLOWER)
			return
		case <-rf.chRequestVote: // 如果接收到candidate的高term RequestVote
			rf.changeRole(FOLLOWER)
			return
		}
	}
}

AppendEntries(心跳接受的RPC)

func (rf *Raft) AppendEntry(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	// follower 接收心跳处理
	rf.mu.Lock()
	defer rf.mu.Unlock()
	if len(args.Entries) == 0 {
		if args.Term < rf.currentTerm {  // 收到的心跳Term太小
			reply.Success = false
			reply.Term = rf.currentTerm
		} else {
			DPrintf("%d: Got valid heartbeat from %d\n", rf.me, args.LeaderID)
			rf.currentTerm = args.Term
			reply.Success = true
			go func(){
				rf.chAppendEntry <- struct{}{}
			}()
		}
	}
}

RequestVote(投票请求的处理)

type RequestVoteArgs struct {
	// Your data here (2A, 2B).
	Term         int // candidate's term
	CandidateID  int // 暂时用rafts数组的下标作为id
	LastLogIndex int // 最后一个日志条目的下标
	LastLogTerm  int
}

//
// example RequestVote RPC reply structure.
// field names must start with capital letters!
//
type RequestVoteReply struct {
	// Your data here (2A).
	Term        int  // currentTerm, for candidate to update itself
	VoteGranted bool // true表示同意选举
}

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	// Your code here (2A, 2B).
	rf.mu.Lock()
	defer rf.mu.Unlock()
	if args.Term < rf.currentTerm {
		reply.VoteGranted = false
		reply.Term = rf.currentTerm
		DPrintf("%d: refuse vote to %d\n", rf.me, args.CandidateID)
	} else if rf.votedFor == -1 || rf.votedFor == args.CandidateID {
		curLogIndex := len(rf.log) - 1
		if args.LastLogIndex >= curLogIndex && args.LastLogTerm >= rf.log[curLogIndex].Term { // candidates的log至少和我一样新
			DPrintf("%d: Vote to %d\n", rf.me, args.CandidateID)
			reply.VoteGranted = true
			reply.Term = args.Term
			rf.currentTerm = args.Term
			rf.votedFor = args.CandidateID

			go func() {
				rf.chRequestVote <- struct{}{} 
			}() // 通知自己收到了合法的请求投票
		} else {
			reply.VoteGranted = false
			// reply.Term = rf.currentTerm
			DPrintf("%d: refuese vote to %d, because the old log\n", rf.me, args.CandidateID)
		}
	} else {
		reply.VoteGranted = false
		// reply.Term = rf.currentTerm
		DPrintf("%d: refuese vote to %d, because has voted->%d.\n", rf.me, args.CandidateID, rf.votedFor)
	}
}

运行结果

➜  raft git:(master) ✗ go test -run 2A -race
Test (2A): initial election ...
  ... Passed --   4.6  3   38    9128    0
Test (2A): election after network failure ...
  ... Passed --   7.5  3   86   17588    0
PASS
ok  	_/Users/sjy/develop/Go/6.824/src/raft	12.486s

Lab2B Log replication

从测试出发进行理解(applyCh的意义:将日志应用到状态机上)

从TestBasicAgree2B 测试函数出发 -> make_config -> start1
start1函数中有将commit command 写入cfg的log中(从通道applyCh中接收新的command)
在TestBasicAgree2B 测试函数中 nCommitted函数会检查cfg的log,查看是否大多数节点已经agreement了
所以需要把start1中的applyCh传给当前的server节点,当有新的log entry commit后,通过applyCh发送相关消息

leader的nextIndex数组保存着要发给每个server的log index,初始化为leader 的最后一个log的下标+1(len(log))
leader的matchIndex保存着已经复制到某个server的log index,初始化为0;leader不断的检查matchIndex,来确定是否存在新的commitIndex

每个server都会不停得检查lastApplied Index,一但commitIndex大于lastApplied,当前Server会把这条日志通过applyCh应用到状态机里

日志复制的流程

client发送command -> leader
leader append to leader's log -> send RPC to follower -> follower append to log(Replicate)
-> majority follower agree -> leader apply the command to state machine(this log marked as committed)
-> follower know the log is committed -> follower apply the command to state machine

完整代码我放到gitee上了,下面只记录通过测试的时候的遇到的各种问题。

TestBasicAgree2B 中由于忘记更新AppendEntries RPC的Term,导致一直不同步

TestFailAgree2B 中由于脱离连接的节点不停得ElectionTimeout,所以Term一直增加,Leader会收到对AppendEntries拒绝的reply,因为Term的原因
所以Leader在收到高Term的AppendEntries Reply后,需要step down,重新选举,更新term

TestFailNoAgree2B Passed (但是会出现conflictIndex=-1的情况,解决:在Leader的Term中,不要立即使用rf.currentTerm,使用之前保存的LeaderCurrentTerm,防止被RPC篡改)
问题,出现类似死锁
 

2021/05/20 12:28:41 Leader 3:Update Server 2 matchIndex = 2, nextIndex = 3
2021/05/20 12:28:41 4: Eletion Timeout, start election
2021/05/20 12:28:41 1: Eletion Timeout, start election
2021/05/20 12:28:41 0: Eletion Timeout, start election
2021/05/20 12:28:42 4: Eletion Timeout, start election
2021/05/20 12:28:43 Follower 4: invalid AppendEntry RPC, args.Term=1, rf.currentTerm=3
2021/05/20 12:28:43 Follower 1: invalid AppendEntry RPC, args.Term=1, rf.currentTerm=2
2021/05/20 12:28:43 Follower 0: invalid AppendEntry RPC, args.Term=1, rf.currentTerm=2
2021/05/20 12:28:43 Leader 3: Saw term 2, step down
2021/05/20 12:28:43 1: Eletion Timeout, start election
2021/05/20 12:28:43 2: refuese vote to 1, because has voted->3.
2021/05/20 12:28:43 4: refuese vote to 1, because has voted->4.
2021/05/20 12:28:43 0: Vote to 1
2021/05/20 12:28:43 0: recieve the candidates' request for vote
2021/05/20 12:28:44 1: Eletion Timeout, start election
2021/05/20 12:28:44 0: Vote to 1
2021/05/20 12:28:44 4: Vote to 1
2021/05/20 12:28:44 2: refuese vote to 1, because has voted->3.
2021/05/20 12:28:44 0: recieve the candidates' request for vote
2021/05/20 12:28:44 4: recieve the candidates' request for vote
2021/05/20 12:28:44 1: Become Leader

把心跳接收到高Term的回复后的通知管道改为有缓冲的了,同时比较Term的时候用leader的term缓存来比较

TestConcurrentStarts2B Failed

貌似出现死锁,其实是matchIndex一直不更新,导致无法Leader无法跟新commitIndex
原因:nextIndex的更新方式

if reply.Success {
    // rf.matchIndex[server] = rf.nextIndex[server]
    rf.nextIndex[server] += len(args.Entries)
    rf.matchIndex[server] = rf.nextIndex[server] - 1
    // ...
}

TestRejoin2B Failed

在两个日志不一样的server选举的时候出现问题,无法选出leader

修改RequestVote RPC handler的投票同意条件(注意其实这里错了,log的term相等的时候不能保证是一样新的,所以此处的<=应该是<)

if rf.log[curLogIndex].Term <= args.LastLogTerm || (args.LastLogTerm == rf.log[curLogIndex].Term && args.LastLogIndex >= curLogIndex) { } // candidates的log至少和我一样新

遇到问题:在TestFailAgree2B中,发现会仅仅根据term投票给candidate,导致测试不通过

再次修改投票同意条件,只有args.LastLogTerm比当前节点大的时候才认为比当前的log新

curLogIndex := len(rf.log) - 1
if rf.log[curLogIndex].Term < args.LastLogTerm || (args.LastLogTerm == rf.log[curLogIndex].Term && args.LastLogIndex >= curLogIndex) {} // candidates的log至少和我一样新

TestBackup2B Failed

发现包含冲突的日志的3个server不能选举出leader,因为term的不匹配
原因:在选举时忘记更新term

go func(server int) {
	reply := RequestVoteReply{}
	if ok := rf.sendRequestVote(server, &args, &reply); ok {
		if reply.VoteGranted {
			voteMu.Lock()
			voteCount++
			if !notified && voteCount >= len(rf.peers) / 2 {
				voteOk <- struct{}{}
				notified = true  // 其他的选票就不再通知了
				voteMu.Unlock()
			} else {
				voteMu.Unlock()
			}
		} else {
			if reply.Term > leaderCurrentTerm {
				rf.mu.Lock()
				rf.currentTerm = reply.Term
				rf.mu.Unlock()
			}
		}
	}
}(serverID)

最终通过所有的测试:

$ time go test -run 2B
Test (2B): basic agreement ...
  ... Passed --   1.6  3   16    4434    3
Test (2B): RPC byte count ...
  ... Passed --   4.9  3   48  114046   11
Test (2B): agreement despite follower disconnection ...
  ... Passed --   9.0  3   78   19452    8
Test (2B): no agreement if too many followers disconnect ...
  ... Passed --   5.4  5  116   26368    4
Test (2B): concurrent Start()s ...
  ... Passed --   2.2  3   14    3906    6
Test (2B): rejoin of partitioned leader ...
  ... Passed --   8.1  3  120   28561    4
Test (2B): leader backs up quickly over incorrect follower logs ...
  ... Passed --  50.0  5 1968 1587150  102
Test (2B): RPC counts aren't too high ...
  ... Passed --   2.8  3   26    7518   12
PASS
ok  	_/Users/sjy/develop/Go/6.824/src/raft	84.752s
go test -run 2B  2.30s user 1.47s system 4% cpu 1:25.09 total

Lab2C Log Persistence

将raft节点的部分信息持久化到“disk”上(其实是persister)

在labgob的使用中,需要注册持久化数据的基本数据类型,由于我的日志条目的第一条内容是个空的struct,所以需要注册这个类型。

虽然通过了2B,但还有情况选不出来leader,在测试2C的过程中发现了。参考了下大佬的代码,发现stepDown函数的妙用。主要是在接受心跳回复和投票回复时,进行处理判断,如果收到了高Term的回复,直接退回到follower状态,不再处理。

部分函数修改后,通过了测试,代码在这里

$ go test -run 2C
Test (2C): basic persistence ...
labgob warning: Decoding into a non-default variable/field int may not work
  ... Passed --   6.5  3  182   37778    6
Test (2C): more persistence ...
  ... Passed --  23.5  5  652  139412   16
Test (2C): partitioned leader and one follower crash, leader restarts ...
  ... Passed --   4.2  3   52   11504    4
Test (2C): Figure 8 ...
  ... Passed --  40.3  5 13452 2786455   15
Test (2C): unreliable agreement ...
  ... Passed --  11.0  5  216   72284  246
Test (2C): Figure 8 (unreliable) ...
  ... Passed --  40.8  5 1728 5864070  933
Test (2C): churn ...
  ... Passed --  16.3  5  700  441128  130
Test (2C): unreliable churn ...
  ... Passed --  16.5  5  332  117614  123
PASS
ok  	_/6.824/src/raft	159.471s