使用Flowable.generate()生成可感知背压的流– RxJava常见问题解答
RxJava缺少創(chuàng)建無限自然數(shù)流的工廠。 這樣的流很有用,例如,當(dāng)您想通過壓縮兩個事件的唯一序列號給可能的無限事件流時:
Flowable<Long> naturalNumbers = //???Flowable<Event> someInfiniteEventStream = //... Flowable<Pair<Long, Event>> sequenced = Flowable.zip(naturalNumbers,someInfiniteEventStream,Pair::of );實現(xiàn)naturalNumbers令人驚訝地復(fù)雜。 在RxJava 1.x中,您可以短暫地放棄不遵守背壓的Observable :
import rx.Observable; //RxJava 1.xObservable<Long> naturalNumbers = Observable.create(subscriber -> {long state = 0;//poor solution :-(while (!subscriber.isUnsubscribed()) {subscriber.onNext(state++);} });這樣的流沒有背壓是什么意思? 好吧,基本上,流可以輕松地以CPU內(nèi)核允許的速度生成事件( state變量不斷增加),每秒數(shù)百萬。 但是,當(dāng)使用者無法快速使用事件時,未處理事件的積壓開始出現(xiàn):
naturalNumbers // .observeOn(Schedulers.io()).subscribe(x -> {//slooow, 1 millisecond});上面的程序(帶有observeOn()運算符的注釋掉)可以正常運行,因為它具有意外的反壓。 默認情況下,所有內(nèi)容在RxJava中都是單線程的,因此生產(chǎn)者和使用者在同一個線程中工作。 實際上,調(diào)用subscriber.onNext()會阻止,因此while循環(huán)會自動對其進行限制。 但是,嘗試取消注釋observeOn() ,災(zāi)難會在幾毫秒后發(fā)生。 訂閱回調(diào)在設(shè)計上是單線程的。 對于每個元素,它至少需要1毫秒,因此該流每秒可以處理不超過1000個事件。 我們有些幸運。 RxJavaSwift發(fā)現(xiàn)這種災(zāi)難性狀況,并因MissingBackpressureException而快速失敗
我們最大的錯誤是在生成事件時沒有考慮消費者的速度。 順便說一下,這是響應(yīng)流背后的核心思想:不允許生產(chǎn)者發(fā)出比消費者請求更多的事件。 在RxJava 1.x中,即使實現(xiàn)最簡單的流(從頭開始考慮背壓)也不是一件容易的事。 RxJava 2.x帶來了一些便利的運算符,這些運算符建立在先前版本的經(jīng)驗基礎(chǔ)之上。 首先RxJava 2.x時不允許你實現(xiàn)Flowable (背壓-aware)的相同的方式,你可以與Observable 。 創(chuàng)建Flowable會使消費者使消息過載是不可能的:
Flowable<Long> naturalNumbers = Flowable.create(subscriber -> {long state = 0;while (!subscriber.isCancelled()) {subscriber.onNext(state++);} }, BackpressureStrategy.DROP);您是否發(fā)現(xiàn)了這個額外的DROP參數(shù)? 在解釋之前,讓我們看一下使用慢速用戶訂閱時的輸出:
0 1 2 3 //...continuous numbers... 126 127 101811682 //...where did my 100M events go?!? 101811683 101811684 101811685 //...continuous numbers... 101811776 //...17M events disappeared again... 101811777 //...你的旅費可能會改變。 怎么了? observeOn()運算符在調(diào)度程序(線程池)之間切換。 從未決事件隊列中合并的線程池。 此隊列是有限的,容量為128個元素。 知道此限制的observeOn()運算符僅從上游請求128個元素(我們的自定義Flowable )。 此時,它使我們的訂戶可以處理事件,每毫秒1次。 因此,大約100毫秒后, observeOn()發(fā)現(xiàn)其內(nèi)部隊列幾乎為空,并要求更多。 會得到128、129、130…嗎? 沒有! 我們的Flowable在這0.1秒內(nèi)產(chǎn)生了瘋狂的事件,并且(令人驚訝地)在該時間范圍內(nèi)成功產(chǎn)生了超過1億個數(shù)字。 他們?nèi)ツ牧?好吧, observeOn()并沒有要求它們,因此DROP策略(強制性參數(shù))只是丟棄了不需要的事件。
BackpressureStrategy
聽起來不對,還有其他策略嗎? 是的,很多:
- BackpressureStrategy.BUFFER :如果上游產(chǎn)生太多事件,則將它們緩沖在無界隊列中。 沒有任何事件丟失,但是您的整個應(yīng)用程序很可能會丟失。 如果幸運的話, OutOfMemoryError將拯救您。 我停留在5秒以上的GC暫停中。
- BackpressureStrategy.ERROR :如果發(fā)現(xiàn)事件的過度產(chǎn)生,將拋出MissingBackpressureException 。 這是一個理智(安全)的策略。
- BackpressureStrategy.LATEST :類似于DROP ,但是記住上次刪除的事件。 萬一要求提供更多數(shù)據(jù),但我們只是丟棄了所有內(nèi)容–至少具有最后看到的價值。
- BackpressureStrategy.MISSING :沒有安全措施,請加以處理。 下游運算符之一(如observeOn() )最有可能拋出MissingBackpressureException 。
- BackpressureStrategy.DROP :刪除未請求的事件。
順便說一句,當(dāng)您將Observable變?yōu)镕lowable還必須提供BackpressureStrategy 。 RxJava必須知道如何限制過量產(chǎn)生的Observable 。 好的,那么簡單的序列自然數(shù)流的正確實現(xiàn)是什么?
認識
create()和generate()之間的區(qū)別在于責(zé)任。 假設(shè)Flowable.create()會在不考慮背壓的情況下完整地生成流。 它只是在需要時才產(chǎn)生事件。 另一方面,僅允許Flowable.generate()一次生成一個事件(或完成流)。 背壓機制透明地計算出當(dāng)前需要多少個事件。 generate()調(diào)用適當(dāng)?shù)拇螖?shù),例如,在observeOn()情況下, observeOn() 128次。
因為此運算符一次生成一個事件,所以通常需要某種狀態(tài)來確定上次出現(xiàn)的時間1 。 這就是generate()含義:(im)可變狀態(tài)的持有者和基于該狀態(tài)生成下一個事件的函數(shù):
Flowable<Long> naturalNumbers =Flowable.generate(() -> 0L, (state, emitter) -> {emitter.onNext(state);return state + 1;});generate()的第一個參數(shù)是初始狀態(tài)(工廠),在本例中為0L 。 現(xiàn)在,每當(dāng)訂戶或任何下游操作員要求一定數(shù)量的事件時,都會調(diào)用lambda表達式。 它的職責(zé)是根據(jù)提供的狀態(tài)最多調(diào)用一次onNext() (最多發(fā)出一個事件)。 首次調(diào)用lambda時, state等于初始值0L 。 但是,我們可以修改狀態(tài)并返回其新值。 在此示例中,我們增加了long以便隨后的lambda表達式調(diào)用收到state = 1L 。 顯然,這種情況不斷發(fā)生,產(chǎn)生連續(xù)的自然數(shù)。
這樣的編程模型顯然比while循環(huán)難。 它還從根本上改變了實現(xiàn)事件源的方式。 與其在任何時候都想推送事件,不如只是被動地等待請求。 下游運營商和訂戶正在從您的流中提取數(shù)據(jù)。 這種轉(zhuǎn)變可在管道的所有級別上產(chǎn)生背壓。
generate()有一些風(fēng)格。 首先,如果您的狀態(tài)是可變對象,則可以使用不需要返回新狀態(tài)值的重載版本。 盡管功能較少,但可變狀態(tài)往往會產(chǎn)生較少的垃圾。 這假設(shè)您的狀態(tài)不斷變化,并且每次都傳遞相同的狀態(tài)對象實例。 例如,您可以輕松地將Iterator (也是基于pull的!)變成具有反壓奇觀的流:
Iterator<Integer> iter = //...Flowable<String> strings = Flowable.generate(() -> iter, (iterator, emitter) -> {if (iterator.hasNext()) {emitter.onNext(iterator.next().toString());} else {emitter.onComplete();} });注意,流的類型( <String> )不必與狀態(tài)類型( Iterator<Integer> )相同。 當(dāng)然,如果您有Java Collection并想將其轉(zhuǎn)換為流,則不必先創(chuàng)建迭代器。 使用Flowable.fromIterable()足夠了。 甚至更簡單的generate()版本都假定您根本沒有任何狀態(tài)。 例如隨機數(shù)流:
Flowable<Double> randoms = Flowable.generate(emitter -> emitter.onNext(Math.random()));但老實說,您可能最終將需要一個Random實例:
Flowable.generate(Random::new, (random, emitter) -> {emitter.onNext(random.nextBoolean()); });摘要
如您所見,RxJava 1.x中的Observable.create()和Flowable.create Flowable.create()有一些缺點。 如果您真的在乎大量并發(fā)系統(tǒng)的可伸縮性和運行狀況(否則您將不會讀到這篇文章!),則必須了解背壓。 如果您真的需要從頭開始創(chuàng)建流,而不是使用from*()系列方法或繁重工作的各種庫,請熟悉generate() 。 本質(zhì)上,您必須學(xué)習(xí)如何將某些類型的數(shù)據(jù)源建模為奇特的迭代器。 期待有更多文章解釋如何實現(xiàn)更多現(xiàn)實生活流。
這類似于無狀態(tài)HTTP協(xié)議,該協(xié)議在服務(wù)器上使用稱為session *的小狀態(tài)來跟蹤過去的請求。
翻譯自: https://www.javacodegeeks.com/2017/08/generating-backpressure-aware-streams-flowable-generate-rxjava-faq.html
總結(jié)
以上是生活随笔為你收集整理的使用Flowable.generate()生成可感知背压的流– RxJava常见问题解答的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2023中国民营企业研发十强公布:百度、
- 下一篇: 检测Java Web应用程序而无需修改其