日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

用 Go 语言实现 Raft 选主

發布時間:2025/3/21 编程问答 15 豆豆
生活随笔 收集整理的這篇文章主要介紹了 用 Go 语言实现 Raft 选主 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

用 Go 語言實現 Raft 選主

選主模塊主要包括三大功能:

  • candidate狀態下的選主功能
  • leader狀態下的心跳廣播功能
  • follower狀態下的確認功能

candidate狀態下的選主功能

candidate狀態下的選主功能需要關注兩個方面:

  • 何時進入candidate狀態,進行選主?
  • 選主的邏輯是怎樣的?

首先,來討論何時進入candidate狀態,進行選主

在一定時間內沒有收到來自leader或者其他candidate的有效RPC時,將會觸發選主。這里需要關注的是有效兩個字,要么是leader發的有效的心跳信息,要么是candidate發的是有效的選主信息,即server本身確認這些信息是有效的后,才會重新更新超時時間,超時時間根據raft論文中推薦設置為[150ms,300ms],并且每次是隨機生成的值。

其次,來討論選主的邏輯。

server首先會進行選主的初始化操作,即server會增加其term,把狀態改成candidate,然后選舉自己為主,并把選主的RPC并行地發送給集群中其他的server,根據返回的RPC的情況的不同,做不同的處理:

  • 該server被選為leader
  • 其他的server選為leader
  • 一段時間后,沒有server被選為leader

針對情況一,該server被選為leader,當前僅當在大多數的server投票給該server時。當其被選為主時,會立馬發送心跳消息給其他的server,來表明其已經是leader,防止發生新的選舉。

針對情況二,其他的server被選為leader,它會收到leader發送的心跳信息,此時,該server應該轉為follower,然后退出選舉。

針對情況三,一段時間后,沒有server被選為leader,這種情況發生在沒有server獲得了大多數的server的投票情況下,此時,應該發起新一輪的選舉。

leader狀態下的心跳廣播功能

當某個server被選為leader后,需要廣播心跳信息,表明其是leader,主要在以下兩個場景觸發:

  • server剛當選為leader
  • server周期性的發送心跳消息,防止其他的server進入candidate選舉狀態

leader廣播心跳的邏輯為,如果廣播的心跳信息得到了大多數的server的確認,那么更新leader自身的選舉超時時間,防止發生重新選舉。

follower狀態下的確認功能

主要包括對candidate發的選舉RPC以及leader發來的心跳RPC的確認功能。

對于選舉RPC,假設candidate c發送選舉RPC到該follower,由于follower每個term只能選舉一個server,因此,只有當一個follower沒有選舉其他server的時候,并且選舉RPC中的candidate c的term大于或等于follower的term時,才會返回選舉當前candidate c為主,否則,則返回拒絕選舉當前candidate c為主。

對于leader的心跳RPC,如果leader的心跳的term大于或等于follower的term,則認可該leader的心跳,否則,不認可該leader的心跳。

代碼實現

candidate狀態下的選主功能

根據前面描述,主要的邏輯為

  • 等待選舉超時
  • 增加term,置狀態為follower,并且選舉自己為leader
  • 向其他的server并行地發送選舉RPC,直到碰到上述描述的三種情況退出
func (rf *Raft) election_one_round() bool {// begin electionvar timeout int64var done intvar triggerHeartbeat booltimeout = 100last := milliseconds()success := falserf.mu.Lock()rf.becomeCandidate()rf.mu.Unlock()printTime()rpcTimeout := 20fmt.Printf("candidate=%d start electing leader\n", rf.me)for {for i := 0; i < len(rf.peers); i++ {if i != rf.me {var args RequestVoteArgsserver := iargs.Term = rf.currentTermargs.CandidateId = rf.mevar reply RequestVoteReplyprintTime()fmt.Printf("candidate=%d send request vote to server=%d\n", rf.me, i)go rf.sendRequestVoteAndTrigger(server, args, &reply, rpcTimeout)}}done = 0triggerHeartbeat = falsefor i := 0; i < len(rf.peers)-1; i++ {printTime()fmt.Printf("candidate=%d waiting for select for i=%d\n", rf.me, i)select {case ok := <-rf.electCh:if ok {done++success = done >= len(rf.peers)/2 || rf.currentLeader > -1success = success && rf.votedFor == rf.meif success && !triggerHeartbeat {triggerHeartbeat = truerf.mu.Lock()rf.becomeLeader()rf.mu.Unlock()rf.heartbeat <- trueprintTime()fmt.Printf("candidate=%d becomes leader\n", rf.currentLeader)}}}printTime()fmt.Printf("candidate=%d complete for select for i=%d\n", rf.me, i)}if (timeout+last < milliseconds()) || (done >= len(rf.peers)/2 || rf.currentLeader > -1) {break} else {select {case <-time.After(time.Duration(10) * time.Millisecond):}}}printTime()fmt.Printf("candidate=%d receive votes status=%t\n", rf.me, success)return success }

首先等待選舉超時,超時后,會進入真正的選舉邏輯election_one_round()

首先,進入candidate狀態,增加其term,然后,選舉自己。

func (rf *Raft) becomeCandidate() { rf.state = 1 rf.setTerm(rf.currentTerm + 1)rf.votedFor = rf.merf.currentLeader = -1 }

接著,向除自己外的server發送選舉RPC,等待server的回復

fmt.Printf("candidate=%d start electing leader\n", rf.me)for {for i := 0; i < len(rf.peers); i++ {if i != rf.me {var args RequestVoteArgsserver := iargs.Term = rf.currentTermargs.CandidateId = rf.mevar reply RequestVoteReplyprintTime()fmt.Printf("candidate=%d send request vote to server=%d\n", rf.me, i)go rf.sendRequestVoteAndTrigger(server, args, &reply, rpcTimeout)}}done = 0triggerHeartbeat = falsefor i := 0; i < len(rf.peers)-1; i++ {printTime()fmt.Printf("candidate=%d waiting for select for i=%d\n", rf.me, i)select {case ok := <-rf.electCh:if ok {done++success = done >= len(rf.peers)/2 || rf.currentLeader > -1success = success && rf.votedFor == rf.meif success && !triggerHeartbeat {triggerHeartbeat = truerf.mu.Lock()rf.becomeLeader()rf.mu.Unlock()rf.heartbeat <- trueprintTime()fmt.Printf("candidate=%d becomes leader\n", rf.currentLeader)}}}printTime()fmt.Printf("candidate=%d complete for select for i=%d\n", rf.me, i)}if (timeout+last < milliseconds()) || (done >= len(rf.peers)/2 || rf.currentLeader > -1) {break} else {select {case <-time.After(time.Duration(10) * time.Millisecond):}}}printTime()fmt.Printf("candidate=%d receive votes status=%t\n", rf.me, success)return success

當成功返回數目到多數派時(包含自己在內),則宣布自己稱為leader,即becomeLeader(),如下

func (rf *Raft) becomeLeader() {rf.state = 2rf.currentLeader = rf.me }

即,修改自身狀態為leader。然后,給發送心跳的線程發送 rf.heartbeat <-true,通知心跳線程開始發心跳包。

leader狀態下的廣播心跳功能

首先,來看觸發心跳的邏輯

func (rf *Raft) sendLeaderHeartBeat() {timeout := 20for { select {case <-rf.heartbeat:rf.sendAppendEntriesImpl()case <-time.After(time.Duration(timeout) * time.Millisecond):rf.sendAppendEntriesImpl()} } }

分為兩個方面:

  • 第一個為剛當選為leader后,需要馬上發送心跳信息,防止新的選舉發生

  • 第二個是leader周期性的發送心跳信息,來宣布自己為主

真正的廣播心跳的邏輯如下:

func (rf *Raft) sendAppendEntriesImpl() {if rf.currentLeader == rf.me {var args AppendEntriesArgsvar success_count inttimeout := 20args.LeaderId = rf.meargs.Term = rf.currentTermprintTime()fmt.Printf("broadcast heartbeat start\n")for i := 0; i < len(rf.peers); i++ {if i != rf.me {var reply AppendEntriesReplyprintTime()fmt.Printf("Leader=%d send heartbeat to server=%d\n", rf.me, i)go rf.sendHeartBeat(i, args, &reply, timeout)}}for i := 0; i < len(rf.peers)-1; i++ {select {case ok := <-rf.heartbeatRe:if ok {success_count++if success_count >= len(rf.peers)/2 {rf.mu.Lock()rf.setMessageTime(milliseconds())rf.mu.Unlock()}}}}printTime()fmt.Printf("broadcast heartbeat end\n")if success_count < len(rf.peers)/2 {rf.mu.Lock()rf.currentLeader = -1rf.mu.Unlock()}} }

先是向集群中所有的其他server廣播心跳,分為兩種結果:

  • 收到了大多數server的確認,則更新leader的超時時間,防止重新進入選舉狀態

  • 未收到大多數server的確認,則會退出發送心跳的邏輯,即置currentLeader = -1,此后,自然會有選舉超時的server重新發起選舉

follower狀態下的確認功能

包括對選舉RPC的確認已經對心跳RPC的確認。

選舉RPC的確認邏輯如下

func (rf *Raft) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) {// Your code here.currentTerm, _ := rf.GetState()if args.Term < currentTerm {reply.Term = currentTermreply.VoteGranted = falseprintTime() fmt.Printf("candidate=%d term = %d smaller than server = %d, currentTerm = %d\n", args.CandidateId, args.Term, rf.me, rf.currentTerm)return } if rf.votedFor != -1 && args.Term <= rf.currentTerm {reply.VoteGranted = falserf.mu.Lock()rf.setTerm(max(args.Term, currentTerm))reply.Term = rf.currentTermrf.mu.Unlock()printTime() fmt.Printf("rejected candidate=%d term = %d server = %d, currentTerm = %d, has_voted_for = %d\n", args.CandidateId, args.Term, rf.me, rf.currentTerm, rf.votedFor)} else { rf.mu.Lock()rf.becomeFollower(max(args.Term, currentTerm), args.CandidateId)rf.mu.Unlock()reply.VoteGranted = truefmt.Printf("accepted server = %d voted_for candidate = %d\n", rf.me, args.CandidateId)} }

如果當前server的term大于candidate的term,或者當前server已經選舉過其他server為leader了,那么返回拒絕的RPC,否則,則返回成功的RPC,并置自身狀態為follower。

心跳的RPC的邏輯如下

func (rf *Raft) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) {if args.Term < rf.currentTerm {reply.Success = falsereply.Term = rf.currentTerm} else { reply.Success = truereply.Term = rf.currentTermrf.mu.Lock() rf.currentLeader = args.LeaderIdrf.votedFor = args.LeaderIdrf.state = 0 rf.setMessageTime(milliseconds())printTime() fmt.Printf("server = %d learned that leader = %d\n", rf.me, rf.currentLeader)rf.mu.Unlock()} }

如果follower的term大于leader的term,則返回拒絕的RPC,否則,返回成功的RPC。

總結

以上是生活随笔為你收集整理的用 Go 语言实现 Raft 选主的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。