日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

使用反应流API将Akka流与rxJava结合在一起

發(fā)布時間:2023/12/3 java 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用反应流API将Akka流与rxJava结合在一起 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

這次只是一篇簡短的文章,因為我仍在嘗試這種東西。 關(guān)于反應(yīng)式編程有很多話題。 在Java 8中,我們有Stream API,我們有rxJava我們有ratpack ,Akka有akka-streams 。

這些實現(xiàn)的主要問題是它們不兼容。 您不能將一個實現(xiàn)的訂閱者連接到另一個實現(xiàn)的發(fā)布者。 幸運的是,一項倡議已經(jīng)開始提供一種方法,使這些不同的實現(xiàn)可以協(xié)同工作:


“本規(guī)范旨在允許創(chuàng)建許多符合標準的實現(xiàn),這些實現(xiàn)將通過遵守規(guī)則將能夠平滑地互操作,并在流應(yīng)用程序的整個處理圖中保留上述好處和特征。”

來自– http://www.reactive-streams.org/

這是如何運作的

現(xiàn)在我們該怎么做? 讓我們看一下基于akka-stream提供的示例的快速示例(從此處開始 )。 在下面的清單中:

package sample.streamimport akka.actor.ActorSystem import akka.stream.FlowMaterializer import akka.stream.scaladsl.{SubscriberSink, PublisherSource, Source} import com.google.common.collect.{DiscreteDomain, ContiguousSet} import rx.RxReactiveStreams import rx.Observable; import scala.collection.JavaConverters._object BasicTransformation {def main(args: Array[String]): Unit = {// define an implicit actorsystem and import the implicit dispatcherimplicit val system = ActorSystem("Sys")import system.dispatcher// flow materializer determines how the stream is realized.// this time as a flow between actors.implicit val materializer = FlowMaterializer()// input text for the stream.val text ="""|Lorem Ipsum is simply dummy text of the printing and typesetting industry.|Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, |when an unknown printer took a galley of type and scrambled it to make a type |specimen book.""".stripMargin// create an observable from a simple list (this is in rxjava style)val first = Observable.from(text.split("\\s").toList.asJava);// convert the rxJava observable to a publisherval publisher = RxReactiveStreams.toPublisher(first);// based on the publisher create an akka sourceval source = PublisherSource(publisher);// now use the akka style syntax to stream the data from the source// to the sink (in this case this is println)source.map(_.toUpperCase). // executed as actorsfilter(_.length > 3).foreach { el => // the sink/consumerprintln(el)}.onComplete(_ => system.shutdown()) // lifecycle event} }

此示例中的代碼注釋幾乎解釋了正在發(fā)生的事情。 我們在這里所做的是創(chuàng)建一個基于rxJava的Observable。 將此Observable轉(zhuǎn)換為“反應(yīng)流”發(fā)布者,并使用此發(fā)布者創(chuàng)建akka-streams源。 對于其余的代碼,我們可以使用akka-stream樣式流API對流進行建模。 在這種情況下,我們只需要進行一些過濾并打印出結(jié)果即可。

翻譯自: https://www.javacodegeeks.com/2014/11/use-reactive-streams-api-to-combine-akka-streams-with-rxjava.html

總結(jié)

以上是生活随笔為你收集整理的使用反应流API将Akka流与rxJava结合在一起的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。