日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Scala zio-actors与akka-actor集成

發布時間:2023/12/18 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Scala zio-actors与akka-actor集成 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

zio-actors與akka-actor集成

zio-actors 與 akka-actor 是兩種不同實現,分兩種情況:

  • zio actor 發消息給 akka actor
  • akka actor 發消息給 zio actor

依賴

不包括 akka actor 和 zio-actors 依賴,只是集成所需的

"dev.zio" %% "zio-actors-akka-interop" % <VERSION>"

所需的導入如下:

import zio.actors.Actor.Stateful import zio.actors.{ ActorSystem, ActorRef, Context, Supervisor } import zio.actors.akka.{ AkkaTypedActor, AkkaTypedActorRefLocal } import zio.{ IO, Runtime }import akka.actor.typed import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.Scheduler import akka.util.Timeoutimport scala.concurrent.duration._

本章例子的樣例類:

sealed trait TypedMessage[+_] case class PingToZio(zioReplyToActor: ActorRef[ZioMessage], msg: String) extends TypedMessage[Unit] case class PingFromZio(zioSenderActor: ActorRef[ZioMessage]) extends TypedMessage[Unit]sealed trait ZioMessage[+_] case class PongFromAkka(msg: String) extends ZioMessage[Unit] case class Ping(akkaActor: AkkaTypedActorRefLocal[TypedMessage]) extends ZioMessage[Unit]

基本的 actors 使用需要定義一個Stateful來描述 actor 的行為。然后通過監督方式、初始狀態和提到的Stateful來完成 actor 的創建。

在 zio actor 與 akka actor 通信

zio actor Stateful 實現如下:

val handler = new Stateful[Any, String, ZioMessage] {override def receive[A](state: String, msg: ZioMessage[A], context: Context): IO[Throwable, (String, A)] =msg match { case PongFromAkka(msg) => IO.succeed((msg, ())) // zio actor接收akka actor的消息case Ping(akkaActor) => // akkaActor的類型是AkkaTypedActorRefLocal,而不是 akka actor 的ActorReffor {self <- context.self[ZioMessage]_ <- akkaActor ! PingFromZio(self) // 把self帶上用于收回復} yield (state, ())case _=> IO.fail(new Exception("fail"))} }

在 akka actor 中 發送消息到 zio actor

akka actor,需要一個行為(behavior)來定義要處理的消息,在這種情況下向 zio actor 發送和接收消息:

object TestBehavior {lazy val zioRuntime = Runtime.defaultdef apply(): Behavior[TypedMessage[_]] =Behaviors.receiveMessage { message =>message match { case PingToZio(zioReplyToActor, msgToZio) => // 在akka 中發消息,需要unsafeRun執行ZIO effectzioRuntime.unsafeRun(zioReplyToActor ! PongFromAkka(msgToZio)) case PingFromZio(zioSenderActor) => zioRuntime.unsafeRun(zioSenderActor ! PongFromAkka("Pong from Akka"))}Behaviors.same}}

主程序

我們已經準備好開始從 zio 向 akka 發送消息,或者通過fire-and-forget交互模式反過來,但首先我們需要用創建的 akka ActorRef(或ActorSystem)創建一個 ZIO 值,可以使用AkkaTypedActor.make:

for {akkaSystem <- IO(typed.ActorSystem(TestBehavior(), "akkaSystem")) // akka actor 的 ActorSystemsystem <- ActorSystem("zioSystem") // zio actor 的 ActorSystemakkaActor <- AkkaTypedActor.make(akkaSystem) // 使用interop提供的AkkaTypedActor,對akka actor做一次包裝zioActor <- system.make("zioActor", Supervisor.none, "", handler) // 使用zio的ActorSystem創建zio actor_ <- akkaActor ! PingToZio(zioActor, "Ping from Akka") // 發消息給akka actor,并帶上zioActor,用于接收回復_ <- zioActor ! Ping(akkaActor) // 發消息給zio actor,并帶上akkaActor,用于接收回復 } yield ()

zim 中應用

zim 不涉及到2種 actor 通信,websocket 使用的是 akka actor,而在定時任務處使用了 zio actor,實現一個基于 zio actor 的定時器如下:

object ScheduleStateful {val stateful: Stateful[Any, Unit, Command] = new Stateful[Any, Unit, Command] {override def receive[A](state: Unit, msg: Command[A], context: Context): UIO[(Unit, A)] = {val taskIO = msg match {case OnlineUserMessage(descr) =>WsService.getConnections.flatMap { i =>LogUtil.debug(s"${descr.getOrElse("receive")} Total online user => $i")}case _ => UIO.unit}// 這里返回的類型按照zio-actors官網的寫法返回(Unit, A) idea會提示語法錯誤,目前還不知道是誰的問題,只能強制轉換了taskIO.foldM(e => LogUtil.error(s"ScheduleStateful $e").as(() -> "".asInstanceOf[A]),_ => ZIO.succeed(() -> "".asInstanceOf[A]))}} }

根據Stateful創建 actor

lazy val scheduleActor: ZIO[Any, Throwable, ActorRef[protocol.Command]] =actorSystem.flatMap(_.make(Constants.SCHEDULE_JOB_ACTOR, zio.actors.Supervisor.none, (), ScheduleStateful.stateful)).provideLayer(Clock.live ++ InfrastructureConfiguration.live)

啟動 actor,只需要像使用普通方法一樣調用該方法即可:

def scheduleTask: Task[Unit] = {val task = ZioActorSystemConfiguration.scheduleActor.flatMap(f => f ! OnlineUserMessage(Some("scheduleTask"))) repeat Schedule.secondOfMinute(0)// secondOfMinute類似于Cron的時間表,每分鐘的指定秒數重復出現。此處為0秒task.foldM(e => LogUtil.error(s"error => $e").unit,_ => UIO.unit).provideLayer(Clock.live)}

zim 是一個web端即時通訊系統,使用scala2語言,基于zio、tapir、akka,scallikejdbc等庫實現。

總結

以上是生活随笔為你收集整理的Scala zio-actors与akka-actor集成的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。