日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > java >内容正文

java

RxJava + Java8 + Java EE 7 + Arquillian =幸福

發(fā)布時(shí)間:2023/12/3 java 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RxJava + Java8 + Java EE 7 + Arquillian =幸福 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

微服務(wù)是一種體系結(jié)構(gòu)樣式,其中每個(gè)服務(wù)都實(shí)現(xiàn)為一個(gè)獨(dú)立的系統(tǒng)。 他們可以使用自己的持久性系統(tǒng)(盡管不是強(qiáng)制性的),部署,語(yǔ)言等。

由于系統(tǒng)由一個(gè)以上的服務(wù)組成,因此每個(gè)服務(wù)將與其他服務(wù)通信,通常使用輕量級(jí)協(xié)議(如HTTP)并遵循Restful Web方法。 您可以在這里閱讀有關(guān)微服務(wù)的更多信息: http : //martinfowler.com/articles/microservices.html

讓我們看一個(gè)非常簡(jiǎn)單的例子。 假設(shè)我們有一家預(yù)訂商店,用戶可以在其中導(dǎo)航目錄,當(dāng)他們找到想要查看更多信息的書(shū)時(shí),他們單擊isbn,然后打開(kāi)一個(gè)新屏幕,其中包含該書(shū)的詳細(xì)信息和有關(guān)該書(shū)的評(píng)論由讀者撰寫(xiě)。

該系統(tǒng)可能由兩個(gè)服務(wù)組成:

  • 一種獲取書(shū)籍詳細(xì)信息的服務(wù)。 可以從任何傳統(tǒng)系統(tǒng)(如RDBMS)中檢索它們。
  • 一種將所有評(píng)論寫(xiě)在書(shū)中的服務(wù),在這種情況下,信息可以存儲(chǔ)在文檔庫(kù)中。

這里的問(wèn)題是,對(duì)于用戶的每個(gè)請(qǐng)求,我們需要打開(kāi)兩個(gè)連接,每個(gè)服務(wù)一個(gè)。 當(dāng)然,我們需要一種并行執(zhí)行這些工作的方法來(lái)提高性能。 這是一個(gè)問(wèn)題,我們?nèi)绾翁幚懋惒秸?qǐng)求? 第一個(gè)想法是使用Future類(lèi)。 對(duì)于兩個(gè)服務(wù)可能很好,但是如果您需要四個(gè)或五個(gè)服務(wù),則代碼將變得越來(lái)越復(fù)雜,例如,您可能需要從一個(gè)服務(wù)獲取數(shù)據(jù)并將其用于另一服務(wù)或?qū)⒁粋€(gè)服務(wù)的結(jié)果改編為輸入另一個(gè)。 因此,存在線程和同步管理的成本。

有一種干凈整潔的方式來(lái)解決這個(gè)問(wèn)題的方法真是太棒了。 這正是RxJava所做的。 RxJava是Reactive Extensions的Java VM實(shí)現(xiàn):該庫(kù)用于通過(guò)使用可觀察的序列來(lái)組成異步和基于事件的程序。

使用RxJava而不是從結(jié)構(gòu)中提取數(shù)據(jù),而是將數(shù)據(jù)推送到它,該數(shù)據(jù)與訂閱者偵聽(tīng)的事件做出反應(yīng)并一致地起作用。 您可以在https://github.com/Netflix/RxJava中找到更多信息。

因此,在這種情況下,我們將實(shí)現(xiàn)的示例是此處描述的示例,該示例使用RxJava , Java EE 7 , Java 8和Arquillian進(jìn)行測(cè)試。

這篇文章假定您知道如何使用Java EE規(guī)范編寫(xiě)Rest服務(wù)。

因此,讓我們從兩個(gè)服務(wù)開(kāi)始:

@Singleton @Path("bookinfo") public class BookInfoService {@GET@Path("{isbn}")@Produces(MediaType.APPLICATION_JSON)@Consumes(MediaType.APPLICATION_JSON)public JsonObject findBookByISBN(@PathParam("isbn") String isbn) {return Json.createObjectBuilder().add("author", "George R.R. Martin").add("isbn", "1111").add("title", "A Game Of Thrones").build();}}@Singleton @Path("comments") public class CommentsService {@GET@Path("{isbn}")@Produces(MediaType.APPLICATION_JSON)public JsonArray bookComments(@PathParam("isbn") String isbn) {return Json.createArrayBuilder().add("Good Book").add("Awesome").build();}}@ApplicationPath("rest") public class ApplicationResource extends Application { }

最后是時(shí)候創(chuàng)建第三個(gè)外觀服務(wù),該服務(wù)從客戶端接收通信,并行向兩個(gè)服務(wù)發(fā)送請(qǐng)求,最后壓縮兩個(gè)響應(yīng)。 zip是將通過(guò)指定函數(shù)發(fā)出的一組項(xiàng)目組合在一起并將其發(fā)送回客戶端的過(guò)程(不要與壓縮混淆!)。

@Singleton @Path("book") public class BookService {private static final String BOOKSERVICE = "http://localhost:8080/bookservice";private static final String COMMENTSERVICE = "http://localhost:8080/bookcomments";@Resource(name = "DefaultManagedExecutorService")ManagedExecutorService executor;Client bookServiceClient;WebTarget bookServiceTarget;Client commentServiceClient;WebTarget commentServiceTarget;@PostConstructvoid initializeRestClients() {bookServiceClient = ClientBuilder.newClient();bookServiceTarget = bookServiceClient.target(BOOKSERVICE + "/rest/bookinfo");commentServiceClient = ClientBuilder.newClient();commentServiceTarget = commentServiceClient.target(COMMENTSERVICE + "/rest/comments");}@GET@Path("{isbn}")@Produces(MediaType.APPLICATION_JSON)public void bookAndComment(@Suspended final AsyncResponse asyncResponse, @PathParam("isbn") String isbn) {//RxJava code shown below} }

基本上,我們創(chuàng)建一個(gè)新服務(wù)。 在這種情況下,我們將要連接的兩個(gè)服務(wù)的URL都是硬編碼的。 這樣做是出于學(xué)術(shù)目的,但是您將在類(lèi)似于生產(chǎn)的代碼中從生產(chǎn)者類(lèi)或?qū)傩晕募蛴糜诖四康牡娜魏蜗到y(tǒng)中注入它。 然后,我們創(chuàng)建javax.ws.rs.client.WebTarget來(lái)使用Restful Web Service 。

之后,我們需要使用RxJava API實(shí)現(xiàn)bookAndComment方法。

RxJava中使用的主要類(lèi)是rx.Observabl e。 正如他的名字所暗示的那樣,該類(lèi)是可觀察的,它是引發(fā)事件以推動(dòng)對(duì)象的原因。 默認(rèn)情況下,事件是同步的,開(kāi)發(fā)人員有責(zé)任使它們異步。

因此,我們需要為每個(gè)服務(wù)提供一個(gè)異步可觀察實(shí)例:

public Observable<JsonObject> getBookInfo(final String isbn) {return Observable.create((Observable.OnSubscribe<JsonObject>) subscriber -> {Runnable r = () -> {subscriber.onNext(bookServiceTarget.path(isbn).request().get(JsonObject.class));subscriber.onCompleted();};executor.execute(r);}); }

基本上,我們創(chuàng)建一個(gè)Observable ,當(dāng)訂戶訂閱它時(shí)將執(zhí)行指定的功能。 該函數(shù)是使用lambda表達(dá)式創(chuàng)建的,以避免創(chuàng)建嵌套的內(nèi)部類(lèi)。 在這種情況下,由于調(diào)用bookinfo服務(wù),我們將返回JsonObject 。 結(jié)果將傳遞到onNext方法,以便訂閱者可以接收結(jié)果。 因?yàn)槲覀円惒綀?zhí)行此邏輯,所以代碼被包裝在Runnable塊中。

完成所有邏輯后,還需要調(diào)用onCompleted方法。

注意,因?yàn)槌藙?chuàng)建Runnable之外 ,我們還希望使可觀察的異步發(fā)生,所以我們使用Executor在單獨(dú)的線程中運(yùn)行邏輯。 Java EE 7中的一項(xiàng)重大新增功能是在容器內(nèi)創(chuàng)建線程的一種托管方式。 在這種情況下,我們使用容器提供的ManagedExecutorService在當(dāng)前任務(wù)的不同線程中異步跨越任務(wù)。

public Observable<JsonArray> getComments(final String isbn) {return Observable.create((Observable.OnSubscribe<JsonArray>) subscriber -> {Runnable r = () -> {subscriber.onNext(commentServiceTarget.path(isbn).request().get(JsonArray.class));subscriber.onCompleted();};executor.execute(r);}); }

與之前的內(nèi)容類(lèi)似,但沒(méi)有獲取書(shū)籍信息,而是獲得了一系列評(píng)論。

然后,當(dāng)兩個(gè)響應(yīng)均可用時(shí),我們需要?jiǎng)?chuàng)建一個(gè)負(fù)責(zé)將兩個(gè)響應(yīng)壓縮的可觀察對(duì)象。 這是通過(guò)在Observable類(lèi)上使用zip方法完成的,該方法接收兩個(gè)Observable并應(yīng)用一個(gè)函數(shù)來(lái)合并兩個(gè)結(jié)果。 在這種情況下,一個(gè)lambda表達(dá)式將創(chuàng)建一個(gè)新的json對(duì)象,附加兩個(gè)響應(yīng)。

@GET @Path("{isbn}") @Produces(MediaType.APPLICATION_JSON) public void bookAndComment(@Suspended final AsyncResponse asyncResponse, @PathParam("isbn") String isbn) {//Calling previous defined functionsObservable<JsonObject> bookInfo = getBookInfo(isbn);Observable<JsonArray> comments = getComments(isbn);Observable.zip(bookInfo, comments, (JsonObject book, JsonArray bookcomments) ->Json.createObjectBuilder().add("book", book).add("comments", bookcomments).build()).subscribe(new Subscriber<JsonObject>() {@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable e) {asyncResponse.resume(e);}@Overridepublic void onNext(JsonObject jsonObject) {asyncResponse.resume(jsonObject);}}); }

讓我們看一下以前的服務(wù)。 我們正在使用@Suspended批注來(lái)使用Java EE中的新增功能之一,即Jax-Rs 2.0異步REST端點(diǎn)。 基本上,我們正在做的是釋放服務(wù)器資源,并使用resume方法在響應(yīng)可用時(shí)生成響應(yīng)。

最后是測(cè)試。 我們將Wildfly 8.1用作Java EE 7服務(wù)器和Arquillian 。 因?yàn)槊總€(gè)服務(wù)可以在不同的服務(wù)器上進(jìn)行部署,我們將在不同的戰(zhàn)爭(zhēng) ,而是同一個(gè)服務(wù)器內(nèi)部署各項(xiàng)服務(wù)。

因此,在這種情況下,我們將部署三個(gè)戰(zhàn)爭(zhēng)文件,這在Arquillian中非常容易做到。

@RunWith(Arquillian.class) public class BookTest {@Deployment(testable = false, name = "bookservice")public static WebArchive createDeploymentBookInfoService() {return ShrinkWrap.create(WebArchive.class, "bookservice.war").addClasses(BookInfoService.class, ApplicationResource.class);}@Deployment(testable = false, name = "bookcomments")public static WebArchive createDeploymentCommentsService() {return ShrinkWrap.create(WebArchive.class, "bookcomments.war").addClasses(CommentsService.class, ApplicationResource.class);}@Deployment(testable = false, name = "book")public static WebArchive createDeploymentBookService() {WebArchive webArchive = ShrinkWrap.create(WebArchive.class, "book.war").addClasses(BookService.class, ApplicationResource.class).addAsLibraries(Maven.resolver().loadPomFromFile("pom.xml").resolve("com.netflix.rxjava:rxjava-core").withTransitivity().as(JavaArchive.class));return webArchive;}@ArquillianResourceURL base;@Test@OperateOnDeployment("book")public void should_return_book() throws MalformedURLException {Client client = ClientBuilder.newClient();JsonObject book = client.target(URI.create(new URL(base, "rest/").toExternalForm())).path("book/1111").request().get(JsonObject.class);//assertions} }

在這種情況下,客戶將要求一本書(shū)提供所有信息。 在服務(wù)器部分中, zip方法將等待直到并行檢索書(shū)和注釋,然后將兩個(gè)響應(yīng)組合到一個(gè)對(duì)象中并發(fā)送回客戶端。

這是RxJava的非常簡(jiǎn)單的示例。 實(shí)際上,在這種情況下,我們只看到了如何使用zip方法,但是RxJava提供了許多其他有用的方法,例如take() , map() , merge() ,…( https:// github .com / Netflix / RxJava / wiki / Alphabetical-Observable-Operators列表 )

此外,在此示例中,我們僅看到了連接兩個(gè)服務(wù)并并行檢索信息的示例,您可能想知道為什么不使用Future類(lèi)。 在此示例中使用Future和Callbacks完全可以,但是在現(xiàn)實(shí)生活中,您的邏輯將不像壓縮兩個(gè)服務(wù)那樣容易。 也許您將擁有更多服務(wù),也許您需要從一項(xiàng)服務(wù)中獲取信息,然后針對(duì)每個(gè)結(jié)果打開(kāi)一個(gè)新的連接。 如您所見(jiàn),您可能從兩個(gè)Future實(shí)例開(kāi)始,但以一堆Future.get()方法,超時(shí)等結(jié)束,因此,在這些情況下, RxJava確實(shí)簡(jiǎn)化了應(yīng)用程序的開(kāi)發(fā)。

此外,我們還看到了如何使用Java EE 7的一些新增功能,例如如何使用Jax-Rs開(kāi)發(fā)異步Restful服務(wù)。

在這篇文章中,我們學(xué)習(xí)了如何處理服務(wù)之間的互連以及如何使它們可伸縮并減少資源消耗。 但是,我們沒(méi)有談?wù)撨@些服務(wù)之一發(fā)生故障時(shí)發(fā)生的情況。 來(lái)電者怎么了? 我們有辦法進(jìn)行管理嗎? 當(dāng)其中一項(xiàng)服務(wù)不可用時(shí),是否有一種方法可以不浪費(fèi)資源? 我們將在下一篇關(guān)于容錯(cuò)的文章中對(duì)此進(jìn)行介紹。

我們不斷學(xué)習(xí),

亞歷克斯


邦迪亞,邦迪亞! Bon dia aldematí! Fem for a la mandra I saltem corrents del llit。 (Bon Dia!–DàmarisGelabert)

翻譯自: https://www.javacodegeeks.com/2014/07/rxjava-java8-java-ee-7-arquillian-bliss.html

總結(jié)

以上是生活随笔為你收集整理的RxJava + Java8 + Java EE 7 + Arquillian =幸福的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。