日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Akka并发编程——第三节:Actor模型(二)

發(fā)布時(shí)間:2024/1/23 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Akka并发编程——第三节:Actor模型(二) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

本節(jié)主要內(nèi)容:

  • Actor API解析
  • 1. Actor API解析

    Actor中的主要成員變量和方法定義如下:

    package akka.actor trait Actor extends scala.AnyRef {type Receive = akka.actor.Actor.Receive//context變量暴露當(dāng)前Actor的上下文信息及當(dāng)前消息implicit val context : akka.actor.ActorContext = { /* compiled code */ }//self作為當(dāng)前ActorRef的引用implicit final val self : akka.actor.ActorRef = { /* compiled code */ }//當(dāng)前Actor接收到最后一條消息對(duì)應(yīng)的消息發(fā)送者(Actor)final def sender() : akka.actor.ActorRef = { /* compiled code */ }//receive方法,抽象方法,定義Actor的行為邏輯def receive : akka.actor.Actor.Receive//內(nèi)部使用API protected[akka] def aroundReceive(receive : akka.actor.Actor.Receive, msg : scala.Any) : scala.Unit = { /* compiled code */ }protected[akka] def aroundPreStart() : scala.Unit = { /* compiled code */ }protected[akka] def aroundPostStop() : scala.Unit = { /* compiled code */ }protected[akka] def aroundPreRestart(reason : scala.Throwable, message : scala.Option[scala.Any]) : scala.Unit = { /* compiled code */ }protected[akka] def aroundPostRestart(reason : scala.Throwable) : scala.Unit = { /* compiled code */ }//監(jiān)督策略,用于Actor容錯(cuò)處理def supervisorStrategy : akka.actor.SupervisorStrategy = { /* compiled code */ }//Hook方法,用于Actor生命周期監(jiān)控 @scala.throws[T](classOf[scala.Exception])def preStart() : scala.Unit = { /* compiled code */ }@scala.throws[T](classOf[scala.Exception])def postStop() : scala.Unit = { /* compiled code */ }@scala.throws[T](classOf[scala.Exception])def preRestart(reason : scala.Throwable, message : scala.Option[scala.Any]) : scala.Unit = { /* compiled code */ }@scala.throws[T](classOf[scala.Exception])def postRestart(reason : scala.Throwable) : scala.Unit = { /* compiled code */ }//發(fā)送給Actor的消息,Actor沒有定義相應(yīng)的處理邏輯時(shí),會(huì)調(diào)用此方法def unhandled(message : scala.Any) : scala.Unit = { /* compiled code */ } } object Actor extends scala.AnyRef {type Receive = scala.PartialFunction[scala.Any, scala.Unit]//空的行為邏輯@scala.SerialVersionUID(1)object emptyBehavior extends scala.AnyRef with akka.actor.Actor.Receive {def isDefinedAt(x : scala.Any) : scala.Boolean = { /* compiled code */ }def apply(x : scala.Any) : scala.Nothing = { /* compiled code */ }}//Sender為null@scala.SerialVersionUID(1)final val noSender : akka.actor.ActorRef = { /* compiled code */ } }
    • 1

    (1) Hook方法,preStart()、postStop()方法的使用

    /**Actor API: Hook方法*/object Example_05 extends App{import akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.Propsclass FirstActor extends Actor with ActorLogging{//通過context.actorOf方法創(chuàng)建Actorvar child:ActorRef = _//Hook方法,preStart(),Actor啟動(dòng)之前調(diào)用,用于完成初始化工作override def preStart(): Unit ={log.info("preStart() in FirstActor")//通過context上下文創(chuàng)建Actorchild = context.actorOf(Props[MyActor], name = "myChild")}def receive = {//向MyActor發(fā)送消息case x => child ! x;log.info("received "+x)}//Hook方法,postStop(),Actor停止之后調(diào)用override def postStop(): Unit = {log.info("postStop() in FirstActor")}}class MyActor extends Actor with ActorLogging{//Hook方法,preStart(),Actor啟動(dòng)之前調(diào)用,用于完成初始化工作override def preStart(): Unit ={log.info("preStart() in MyActor")}def receive = {case "test" => log.info("received test")case _ => log.info("received unknown message")}//Hook方法,postStop(),Actor停止之后調(diào)用override def postStop(): Unit = {log.info("postStop() in MyActor")}}val system = ActorSystem("MyActorSystem")val systemLog=system.log//創(chuàng)建FirstActor對(duì)象val myactor = system.actorOf(Props[FirstActor], name = "firstActor")systemLog.info("準(zhǔn)備向myactor發(fā)送消息")//向myactor發(fā)送消息myactor!"test"myactor! 123Thread.sleep(5000)//關(guān)閉ActorSystem,停止程序的運(yùn)行system.shutdown()}
    • 1

    代碼運(yùn)行結(jié)果:

    [INFO] [04/02/2016 17:04:49.607] [main] [ActorSystem(MyActorSystem)] 準(zhǔn)備向myactor發(fā)送消息 [INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor [INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received test [INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received 123 [INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] preStart() in MyActor [INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] received test [INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] received unknown message [INFO] [04/02/2016 17:04:54.616] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myChild] postStop() in MyActor [INFO] [04/02/2016 17:04:54.617] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] postStop() in FirstActor

    在代碼

    class FirstActor extends Actor with ActorLogging{//通過context.actorOf方法創(chuàng)建Actorvar child:ActorRef = _//Hook方法,preStart(),Actor啟動(dòng)之前調(diào)用,用于完成初始化工作override def preStart(): Unit ={log.info("preStart() in FirstActor")//通過context上下文創(chuàng)建Actorchild = context.actorOf(Props[MyActor], name = "myChild")}def receive = {//向MyActor發(fā)送消息case x => child ! x;log.info("received "+x)}//Hook方法,postStop(),Actor停止之后調(diào)用,用于完成初始化工作override def postStop(): Unit = {log.info("postStop() in FirstActor")}}

    中分別對(duì)postStop、preStart方法進(jìn)行了重寫,在preStart方法中通過代碼

    child = context.actorOf(Props[MyActor], name = "myChild")

    對(duì)成員變量child進(jìn)行初始化,然后在postStop方法中使用

    //通過context上下文停止MyActor的運(yùn)行context.stop(child)

    停止MyActor的運(yùn)行。在使用代碼

    //創(chuàng)建FirstActor對(duì)象 val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
    • 1

    創(chuàng)建FirstActor時(shí),便會(huì)調(diào)用preStart方法完成MyActor的創(chuàng)建,因此首先會(huì)執(zhí)行FirstActor中的preStart()方法

    dispatcher-4] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor

    然后在創(chuàng)建MyActor時(shí)執(zhí)行MyActor中定義的preStart方法

    [INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] preStart() in MyActor
    • 1

    在執(zhí)行

    //關(guān)閉ActorSystem,停止程序的運(yùn)行system.shutdown()

    FirstActor作為MyActor的Supervisor,會(huì)先停止MyActor,再停止自身,因此先調(diào)用MyActor的postStop方法,再調(diào)用FirstActor的postStop方法。

    (2) 成員變量self及成員方法sender方法的使用

    整體代碼如下:

    /**Actor API:成員變量self及sender()方法的使用*/object Example_05 extends App{import akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.Propsclass FirstActor extends Actor with ActorLogging{//通過context.actorOf方法創(chuàng)建Actorvar child:ActorRef = _override def preStart(): Unit ={log.info("preStart() in FirstActor")//通過context上下文創(chuàng)建Actorchild = context.actorOf(Props[MyActor], name = "myActor")}def receive = {//向MyActor發(fā)送消息case x => child ! x;log.info("received "+x)}}class MyActor extends Actor with ActorLogging{self!"message from self reference"def receive = {case "test" => log.info("received test");sender()!"message from MyActor"case "message from self reference"=>log.info("message from self refrence")case _ => log.info("received unknown message");}}val system = ActorSystem("MyActorSystem")val systemLog=system.log//創(chuàng)建FirstActor對(duì)象val myactor = system.actorOf(Props[FirstActor], name = "firstActor")systemLog.info("準(zhǔn)備向myactor發(fā)送消息")//向myactor發(fā)送消息myactor!"test"myactor! 123Thread.sleep(5000)//關(guān)閉ActorSystem,停止程序的運(yùn)行system.shutdown()}
    • 1

    運(yùn)行結(jié)果:

    [INFO] [04/02/2016 18:40:37.805] [main] [ActorSystem(MyActorSystem)] 準(zhǔn)備向myactor發(fā)送消息 [INFO] [04/02/2016 18:40:37.805] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received test [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received 123 [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received test [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] message from self refrence [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received message from MyActor [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message

    代碼:

    class MyActor extends Actor with ActorLogging{self!"message from self reference"def receive = {case "test" => log.info("received test");sender()!"message from MyActor"case "message from self reference"=>log.info("message from self refrence")case _ => log.info("received unknown message");}}

    中使用

    self!"message from self reference"
    • 1

    向自身發(fā)送了一條消息,receive方法通過

    case "message from self reference"=>log.info("message from self refrence")

    對(duì)這條消息進(jìn)行處理。receive方法在處理

    def receive = {case "test" => log.info("received test");sender()!"message from MyActor"

    “test”消息時(shí),會(huì)調(diào)用

    sender()!"message from MyActor"
    • 1

    向sender(本例中為FirstActor)發(fā)送”message from MyActor”消息,FirstActor使用

    def receive = {//MyActor發(fā)送消息case x => child ! x;log.info("received "+x)}
    • 1

    處理消息時(shí)又向MyActor回送該消息,因此最終的輸出有兩個(gè)unknown message,分別對(duì)應(yīng)123和”message from MyActor”

    [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message
    • 1

    (3) unhandled方法的使用

    unhandled方法用于處理沒有被receive方法處理的消息,下面的代碼給出的是當(dāng)不重寫unhandled方法時(shí)的代碼

    /* *Actor API:unhandled方法 */ object Example_06 extends App{import akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.Propsclass FirstActor extends Actor with ActorLogging{def receive = {//向MyActor發(fā)送消息case "test" => log.info("received test")}}val system = ActorSystem("MyActorSystem")val systemLog=system.log//創(chuàng)建FirstActor對(duì)象val myactor = system.actorOf(Props[FirstActor], name = "firstActor")systemLog.info("準(zhǔn)備向myactor發(fā)送消息")//向myactor發(fā)送消息myactor!"test"myactor! 123Thread.sleep(5000)//關(guān)閉ActorSystem,停止程序的運(yùn)行system.shutdown() }

    代碼輸出:

    [INFO] [04/02/2016 19:14:11.992] [main] [ActorSystem(MyActorSystem)] 準(zhǔn)備向myactor發(fā)送消息 [INFO] [04/02/2016 19:14:11.992] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received test
    • 1

    不難看出,對(duì)于

    myactor! 123

    發(fā)送的這條消息沒有被處理,沒有任何的處理邏輯。在實(shí)際開發(fā)過程中,可能會(huì)對(duì)不能被處理的消息增加一些應(yīng)對(duì)邏輯,此時(shí)可以重寫unhandled方法,代碼如下:

    /* *Actor API:unhandled方法的使用 */ object Example_06 extends App{import akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.Propsclass FirstActor extends Actor with ActorLogging{def receive = {//向MyActor發(fā)送消息case "test" => log.info("received test")}//重寫unhandled方法override def unhandled(message: Any): Unit = {log.info("unhandled message is {}",message)}}val system = ActorSystem("MyActorSystem")val systemLog=system.log//創(chuàng)建FirstActor對(duì)象val myactor = system.actorOf(Props[FirstActor], name = "firstActor")systemLog.info("準(zhǔn)備向myactor發(fā)送消息")//向myactor發(fā)送消息myactor!"test"myactor! 123Thread.sleep(5000)//關(guān)閉ActorSystem,停止程序的運(yùn)行system.shutdown() }

    代碼輸出結(jié)果:

    [INFO] [04/02/2016 19:17:18.458] [main] [ActorSystem(MyActorSystem)] 準(zhǔn)備向myactor發(fā)送消息 [INFO] [04/02/2016 19:17:18.458] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received test [INFO] [04/02/2016 19:17:18.458] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] unhandled message is 123
    • 1

    其它如preRestart等方法的使用將在Akka容錯(cuò)部分進(jìn)行講解。

    總結(jié)

    以上是生活随笔為你收集整理的Akka并发编程——第三节:Actor模型(二)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。