日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

初见akka-02:rpc框架

發布時間:2025/3/15 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 初见akka-02:rpc框架 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

  1.RPC:簡單點說,就是多線程之間的通信,我們今天用了scala以及akka

   來簡單的實現了

   rpc框架的一些簡單的內容,一臉包括了,心跳,間隔時間,

   注冊以及一些問題,

   模式匹配的一些東西,雖然比較簡單,但是屬于麻雀雖小,五臟俱全

   這個里面一共有有四個文件:

   Master.scala

   RemoteMessage.scala

   Worker.scala

   WorkerInfo

  

   Master.scala

package cn.wj.rpcimport akka.actor.{Actor, ActorSystem} import akka.actor.Actor.Receive import com.typesafe.config.ConfigFactory import akka.actor.Props import scala.concurrent.duration._import scala.collection.mutable/*** Created by WJ on 2016/12/23.*/ class Master(val host:String,val port:Int ) extends Actor {// workerId -> WorkerInfoval idToWorker = new mutable.HashMap[String,WorkerInfo]()val workers = new mutable.HashSet[WorkerInfo]()//時間間隔時間,超時檢測的間隔val CHECK_INTERVAL = 15000//用于接收消息override def receive: Receive = {case RegisterWorker(id,memory,cores) => { // println("a client connected") // sender ! "reply" //往發送給他消息的人回復一個消息//判斷一下是不是已經注冊過了if(!(idToWorker.contains(id))){//把Worker的信息封裝以前,保存到內存當中val workerInfo = new WorkerInfo(id,memory,cores)idToWorker(id) = workerInfo //這個應該是scala的特定版本workers += workerInfosender ! RegisteredWorker(s"akka.tcp://MasterSystem@$host:$port/user/Master")}}case Heartbeat(id) =>{if(idToWorker.contains(id)) {val workerInfo = idToWorker(id)//報活//得到系統當前時間val currentTime = System.currentTimeMillis()workerInfo.lastHeartbeatTime = currentTime}}case CheckTimeOutWorker => {val currentTime = System.currentTimeMillis()val toRemove = workers.filter(x => currentTime - x.lastHeartbeatTime > CHECK_INTERVAL)for(w <- toRemove){workers -= widToWorker -= w.id}println(workers.size)}}override def preStart(): Unit = {println("prestart invoked")//導入隱式轉換的功能import context.dispatchercontext.system.scheduler.schedule(0 millis,CHECK_INTERVAL millis,self,CheckTimeOutWorker)} }object Master{def main(args: Array[String]): Unit = {val host = args(0)val port = args(1).toInt// 準備配置val configStr =s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval config = ConfigFactory.parseString(configStr)//ActorSystem老大,輔助創建和監控下面的Actor,他是單例的val actorSystem = ActorSystem("MasterSystem",config )//創建Actorval master = actorSystem.actorOf(Props(new Master(host,port)),"Master")actorSystem.awaitTermination()} }

  Worker.scala

package cn.wj.rpcimport java.util.UUIDimport akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ /*** Created by WJ on 2016/12/23.*/ class Worker(val masterHost:String,val masterPort:Int,val memory:Int,val cores:Int) extends Actor {var master : ActorSelection = _val workerId = UUID.randomUUID().toStringval HEART_INTERVAL = 10000//preStart執行方法的時機:構造器之后,receive之前//與Master(Actor)建立連接override def preStart(): Unit = {//master已經是別的Master的引用了 ,這是跟master建立連接master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")//向Master發送注冊消息master ! RegisterWorker(workerId,memory,cores)}override def receive: Receive = {case RegisteredWorker(masterUrl) => {println(masterUrl)//啟動定時器發送心跳import context.dispatchercontext.system.scheduler.schedule(0 millis,HEART_INTERVAL millis,self,SendHeartbeat)}case SendHeartbeat =>{println("send heartbeat to master")master ! Heartbeat(workerId)}} }object Worker{def main(args: Array[String]): Unit = {val host = args(0)val port = args(1).toIntval masterHost = args(2)val masterPort = args(3).toIntval memory = args(4).toIntval cores = args(5).toInt// 準備配置val configStr =s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval config = ConfigFactory.parseString(configStr)//ActorSystem老大,輔助創建和監控下面的Actor,他是單例的val actorSystem = ActorSystem("WorkerSystem",config )//創建Actor,此時調用該(Actor)的prestart以及receive方法actorSystem.actorOf(Props(new Worker(masterHost,masterPort,memory,cores)),"Worker")actorSystem.awaitTermination()} }

  RemoteMessage.scala

package cn.wj.rpc/*** Created by WJ on 2016/12/25.*/ trait RemoteMessage extends Serializable//Worker->Master(這個表明當master接受這個worker時的信息,是receive)case class RegisterWorker (id:String, memory:Int, cores:Int) extends RemoteMessage//Master -> Worker(這個是master收到workerd的注冊信息,表明已經注冊過這條信息,是sender ! xxx時候出現的) case class RegisteredWorker(masterUrl:String) extends RemoteMessage//這是進程之間自己給自己發送消息,所以采用case object,并且不需要實現Serializable //Worker -> Worker(self) case object SendHeartbeat//這個是work向master發送定時器,其中的id是work的id,因為要向master說明,是哪一個work給他發送的心跳 //Worker -> Master case class Heartbeat(id:String) extends RemoteMessage//Master -> self case object CheckTimeOutWorker

  WorkerInfo.scala

package cn.wj.rpc/*** Created by WJ on 2016/12/25.*/ class WorkerInfo(val id:String ,val memory :Int,val cores:Int) {//TODO 上一次心跳var lastHeartbeatTime:Long = _ }

  這個上面的四個就是簡單的實現了RPC框架,其實就是一個Master監控多個Worker,

  當一個Worker創建了,他就是需要在Master注冊信息,其實這個Master個人感覺就像

  是個Zookeeper,掌管Worker的信息,為其Worker分配一些資源,當Master接到Worker

  的注冊信息的時候,他就在自己的注冊表添加上這個Worker,然后向Worker發送一個注冊

  成功的信息,此時這個Worker的收到這個注冊信息,然后他就給Master發送心跳,這個的

  作用是在告訴Master,我這個Worker是存活的(報活),當一個Worke發送心跳的時間間隔

  過長,長過我們規定的時間,那么此時我們就需要主動殺死這個Worker,感覺hadoop的一些

  分布式和這個原理差不多。

  下面奉上原理圖一張:

  

  其中的receive是用于接受信息,因為繼承Actor,

  prestart這個方法是執行實在類實例之后,receive的方法之后

2.RPC的大概流程

?

  首先定義了一個worker,一個master,master首先啟動了,

  然后它在prestart()的方法里面
  檢測超時的worker,那么在這個里面啟動了一個定時器,

  那么我們自己是不是自己可以手寫一個定時器,
  比如我們可以用線程來搞定時器,但是我們的akka

  里面提供了一個超級簡單的定時器,
  context.system.schedular.schedule

  (0 millis,CHECK_INTERVAL millis,self,CheckTimeOutWorker)
  其中第一個參數:延遲多少秒
  第二個參數:時間間隔
  第三個參數:把這個消息發給誰
  第四個參數:發送什么消息
  雖然它起了消息,但是他不能一下子就把消息發送出去
  ,它只能把消息先發送給自己的receive接收到這個消息,
  然后在發送給我們master,這個里面有一個檢測,
  檢測worker有多長時間沒有向我發送心跳了,
  如果這個時間大過了我規定的范圍,
  這樣,Master啟動完成檢測心跳,worker啟動完成后
  ,首先向master建立連接,然后發送注冊消息
  ,master接受到這個注冊消息,
  把worker的信息保存到內存當中,然后向worker反饋一個消息,
  說你注冊成功了,然后worker啟動一個定時器,
  定時的向master發送心跳,就是這樣的流程

  

?

轉載于:https://www.cnblogs.com/wnbahmbb/p/6220528.html

總結

以上是生活随笔為你收集整理的初见akka-02:rpc框架的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。