日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > java >内容正文

java

RxJava:从未来到可观察

發(fā)布時(shí)間:2023/12/3 java 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RxJava:从未来到可观察 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

大約4年前,我第一次在Matthew Podwysocki的博客上遇到了Reactive Extensions ,但是直到我?guī)字芮翱吹組atthew在Code Mesh上發(fā)表演講之后,我才對(duì)它有所了解。

它似乎最近變得越來越流行,我注意到,現(xiàn)在有一個(gè)由Netflix編寫的Java版本RxJava 。

我以為可以嘗試通過更改在探索cypher的MERGE函數(shù)時(shí)暴露的Observable而不是Future的代碼來嘗試一下。

回顧一下,我們有50個(gè)線程,我們進(jìn)行了100次迭代,在這些迭代中我們創(chuàng)建了隨機(jī)(用戶,事件)對(duì)。 我們最多創(chuàng)建10個(gè)用戶和50個(gè)事件,并且目標(biāo)是同時(shí)發(fā)送相同對(duì)的請(qǐng)求。

在另一篇文章的示例中,我丟棄了每個(gè)查詢的結(jié)果,而在這里我返回了結(jié)果,因此我有一些要訂閱的內(nèi)容。

代碼的輪廓如下所示:

public class MergeTimeRx {public static void main( final String[] args ) throws InterruptedException, IOException{String pathToDb = "/tmp/foo";FileUtils.deleteRecursively( new File( pathToDb ) );GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase( pathToDb );final ExecutionEngine engine = new ExecutionEngine( db );int numberOfThreads = 50;int numberOfUsers = 10;int numberOfEvents = 50;int iterations = 100;Observable<ExecutionResult> events = processEvents( engine, numberOfUsers, numberOfEvents, numberOfThreads, iterations );events.subscribe( new Action1<ExecutionResult>(){@Overridepublic void call( ExecutionResult result ){for ( Map<String, Object> row : result ){}}} );....}}

使用RxJava的好處是,沒有提到我們?nèi)绾潍@取ExecutionResult的集合,這并不重要。 我們只有它們的流,并且通過在Observable上調(diào)用訂閱函數(shù),只要有另一個(gè)函數(shù)可用,我們就會(huì)得到通知。

我發(fā)現(xiàn)的大多數(shù)示例都顯示了如何從單個(gè)線程生成事件,但是我想使用線程池,以便可以同時(shí)觸發(fā)許多請(qǐng)求。 processEvents方法最終看起來像這樣:

private static Observable<ExecutionResult> processEvents( final ExecutionEngine engine, final int numberOfUsers, final int numberOfEvents, final int numberOfThreads, final int iterations ){final Random random = new Random();final List<Integer> userIds = generateIds( numberOfUsers );final List<Integer> eventIds = generateIds( numberOfEvents );return Observable.create( new Observable.OnSubscribeFunc<ExecutionResult>(){@Overridepublic Subscription onSubscribe( final Observer<? super ExecutionResult> observer ){final ExecutorService executor = Executors.newFixedThreadPool( numberOfThreads );List<Future<ExecutionResult>> jobs = new ArrayList<>();for ( int i = 0; i < iterations; i++ ){Future<ExecutionResult> job = executor.submit( new Callable<ExecutionResult>(){@Overridepublic ExecutionResult call(){Integer userId = userIds.get( random.nextInt( numberOfUsers ) );Integer eventId = eventIds.get( random.nextInt( numberOfEvents ) );return engine.execute("MERGE (u:User {id: {userId}})\n" +"MERGE (e:Event {id: {eventId}})\n" +"MERGE (u)-[:HAS_EVENT]->(e)\n" +"RETURN u, e",MapUtil.map( "userId", userId, "eventId", eventId ) );}} );jobs.add( job );}for ( Future<ExecutionResult> future : jobs ){try{observer.onNext( future.get() );}catch ( InterruptedException | ExecutionException ignored ){}}observer.onCompleted();executor.shutdown();return Subscriptions.empty();}} );}

我不確定這是否是使用Observable的正確方法,因此如果我記錯(cuò)了,請(qǐng)?jiān)谠u(píng)論中讓我知道。

我不確定處理錯(cuò)誤的正確方法是什么。 我最初在catch塊中調(diào)用了observer#onError ,但這意味著不會(huì)再產(chǎn)生不是我想要的事件。

如果您想使用它,該代碼可以作為要點(diǎn) 。 我添加了以下依賴關(guān)系以獲得RxJava庫:

<dependency><groupId>com.netflix.rxjava</groupId><artifactId>rxjava-core</artifactId><version>0.15.1</version></dependency>

參考: RxJava : 從未來到我們的JCG合作伙伴 Mark Needham在Mark Needham Blog博客上均可觀察到。

翻譯自: https://www.javacodegeeks.com/2014/01/rxjava-from-future-to-observable.html

總結(jié)

以上是生活随笔為你收集整理的RxJava:从未来到可观察的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。