日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > java >内容正文

java

图解RxJava2(一)

發(fā)布時間:2025/6/15 java 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 图解RxJava2(一) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

前言

從這篇文章開始,系統(tǒng)地學(xué)習(xí)RxJava2設(shè)計思想和源碼實現(xiàn)。說起大熱門RxJava,網(wǎng)上有很多例如響應(yīng)式編程、觀察者模式等介紹,也有一些優(yōu)秀的文章以上、下游等概念引初學(xué)者入門,在初步學(xué)習(xí)之后,可能感覺有所收獲,但是總覺得不夠解渴,要真正知曉其原理,還得結(jié)合源碼加深理解。

例子

通過生活中的幾個角色來學(xué)習(xí)RxJava2:飯店、廚師、服務(wù)員、顧客。

模擬一個情景:飯店有一個很火的套餐,顧客來店默認(rèn)就要這個套餐(不存在服務(wù)員咨詢顧客要什么的過程),所以情況應(yīng)該是這樣的

上面的漫畫寫成RxJava2就是很多入門文章中看到的:事件發(fā)起者(上游)

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {System.out.println("服務(wù)員從廚師那取得 扁食");e.onNext("扁食");System.out.println("服務(wù)員從廚師那取得 拌面");e.onNext("拌面");System.out.println("服務(wù)員從廚師那取得 蒸餃");e.onNext("蒸餃");System.out.println("廚師告知服務(wù)員菜上好了");e.onComplete();}}); 復(fù)制代碼

事件接收者(下游)

Observer<String> observer = new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {System.out.println("來個沙縣套餐!!!");}@Overridepublic void onNext(String s) {System.out.println("服務(wù)員端給顧客 " + s);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {System.out.println("服務(wù)員告訴顧客菜上好了");}}; 復(fù)制代碼

建立聯(lián)系

observable.subscribe(observer); 復(fù)制代碼

打印如下:

來個沙縣套餐!!! 服務(wù)員從廚師那取得 拌面 服務(wù)員端給顧客 拌面 服務(wù)員從廚師那取得 扁食 服務(wù)員端給顧客 扁食 服務(wù)員從廚師那取得 蒸餃 服務(wù)員端給顧客 蒸餃 廚師告知服務(wù)員菜上好了 服務(wù)員告訴顧客菜上好了 復(fù)制代碼

下面把一些類代入角色結(jié)合源碼分析,演員表

源碼分析

最初看源碼的時候容易因為各個類名字起得很相似而看暈,因此先把涉及到的類之間的關(guān)系畫出來

Observable 是個抽象類,其子類是 ObservableCreate ,如果把 Observable 比成飯店,那 ObservableCreate 就是沙縣小吃,看下 Observable 的 create 方法

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {ObjectHelper.requireNonNull(source, "source is null");return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } 復(fù)制代碼

Observable 的 create 方法只是將接收的 ObservableOnSubscribe 作為參數(shù)傳遞給子類 ObservableCreate 真正實例化

public final class ObservableCreate<T> extends Observable<T> {final ObservableOnSubscribe<T> source;public ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;} ... } 復(fù)制代碼

上面這些代碼就是漫畫的第一格:飯店要開張(Observable.create),開張的前提是要有一個會做菜的廚師(new ObservableOnSubscribe),接著飯店起名叫沙縣小吃(new ObservableCreate),并把這個廚師和沙縣小吃建立聯(lián)系(this.source = source)。廚師有了,但是他并沒有立即開始做菜(ObservableOnSubscribe.subscribe()),這個也很好理解,現(xiàn)實生活中廚師也是這樣,他做不做菜取決于飯店,畢竟是飯店給他開工資;而飯店是否讓廚師做菜很大一個原因取決于有沒有顧客上門,看下顧客:

顧客沒有什么套路,上菜就吃(onNext),菜上完或菜出問題會有相應(yīng)的提醒(onComplete/onError),對應(yīng)上面漫畫2。接著看飯店接客 observable.subscribe(observer) 的源碼

public final void subscribe(Observer<? super T> observer) {//省略部分代碼subscribeActual(observer);//省略部分代碼 }protected abstract void subscribeActual(Observer<? super T> observer); 復(fù)制代碼

Observable(飯店) 的 subscribe 方法又會調(diào)用 subscribeActual 方法,該方法是個抽象方法,具體實現(xiàn)在子類,看看子類 ObservableCreate(沙縣小吃)

protected void subscribeActual(Observer<? super T> observer) {//步驟①CreateEmitter<T> parent = new CreateEmitter<T>(observer);//步驟②observer.onSubscribe(parent);try {//步驟③source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);} } 復(fù)制代碼

先看下涉及到的類以及所屬關(guān)系

步驟① Emitter 翻譯為發(fā)射器,這里名字起得也很形象 CreateEmitter(創(chuàng)建發(fā)射器) ,即對應(yīng)服務(wù)員,CreateEmitter 創(chuàng)建的時候接收 Observer,就像一個服務(wù)員接待一個顧客一樣(對應(yīng)漫畫3服務(wù)員說話)

步驟② 執(zhí)行 onSubscribe 方法并接收 CreateEmitter ,所以看到 log 中最先打印該方法的內(nèi)容,就像顧客認(rèn)準(zhǔn)之后自己的菜是由這個服務(wù)員上的(對應(yīng)漫畫3顧客說話)

步驟③ 調(diào)用 ObservableOnSubscribe.subscribe ,并接收 CreateEmitter ,就像廚師和該服務(wù)員建立聯(lián)系,之后廚師做的菜都由該服務(wù)員端出去。上什么菜取決于 ObservableOnSubscribe.subscribe 中的實現(xiàn)。

分析到這里發(fā)現(xiàn) CreateEmitter(服務(wù)員) 起到樞紐作用,看下代碼中 e.onNext/e.onComplete 的實現(xiàn)

public void onNext(T t) {if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}if (!isDisposed()) {observer.onNext(t);}}public void onComplete() {if (!isDisposed()) {try {observer.onComplete();} finally {dispose();}}} 復(fù)制代碼

onNext 中首先判空,服務(wù)員端個空盤子出來要被顧客錘成麻瓜;接著發(fā)送之前需要執(zhí)行 isDisposed() 判斷,可以理解成顧客是否還需要菜,默認(rèn)情況下是需要的(!isDisposed() 為 true ),當(dāng)執(zhí)行完 onComplete() 方法后會執(zhí)行 dispose() ,表明顧客不再需要菜了,后續(xù)的菜服務(wù)員不會再端上來給顧客了。

Observer<String> observer = new Observer<String>() {Disposable disposable;@Overridepublic void onSubscribe(Disposable d) {this.disposable = d;System.out.println("來個沙縣套餐!!!");}@Overridepublic void onNext(String s) {if (s.equals("拌面")) {disposable.dispose();}System.out.println("服務(wù)員端給顧客 " + s);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {System.out.println("服務(wù)員告訴顧客菜上好了");} }; 復(fù)制代碼

打印如下:

來個沙縣套餐!!! 服務(wù)員從廚師那取得 拌面 服務(wù)員端給顧客 拌面 服務(wù)員從廚師那取得 扁食 服務(wù)員從廚師那取得 蒸餃 廚師告知服務(wù)員菜上好了 復(fù)制代碼

從上面可以看到一旦執(zhí)行完 Disposable.dispose() 方法,顧客和服務(wù)員就沒有后續(xù)交流了,就像 Disposable 翻譯的那樣「一次性」,理解成顧客對服務(wù)員說「后續(xù)的菜都別上了,你也不要再出現(xiàn)在我面前」;但是服務(wù)員和廚師的交流還是保持著,默認(rèn)情況下廚師并不知道顧客不需要菜了,因此他還是繼續(xù)做菜,然后交給服務(wù)員端出去。當(dāng)然我們也可以在廚師做下一道菜的之前,判斷下顧客還要不要:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {if (!e.isDisposed()) {System.out.println("服務(wù)員從廚師那取得 拌面");e.onNext("拌面");}if (!e.isDisposed()) {System.out.println("服務(wù)員從廚師那取得 扁食");e.onNext("扁食");}if (!e.isDisposed()) {System.out.println("服務(wù)員從廚師那取得 蒸餃");e.onNext("蒸餃");}if (!e.isDisposed()) {System.out.println("廚師告知服務(wù)員菜上好了");e.onComplete();}} }); 復(fù)制代碼

打印如下:

來個沙縣套餐!!! 服務(wù)員從廚師那取得 拌面 服務(wù)員端給顧客 拌面 復(fù)制代碼

再看下另一種情況

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {System.out.println("服務(wù)員從廚師那取得 拌面");e.onNext("拌面");System.out.println("服務(wù)員從廚師那取得 扁食");e.onNext("扁食");System.out.println("服務(wù)員從廚師那取得 蒸餃");e.onNext("蒸餃");System.out.println("廚師告知服務(wù)員菜上好了");e.onComplete();} }); observable.subscribe(); 復(fù)制代碼

打印如下

服務(wù)員從廚師那取得 拌面 服務(wù)員從廚師那取得 扁食 服務(wù)員從廚師那取得 蒸餃 廚師告知服務(wù)員菜上好了 復(fù)制代碼

上面分析了要有顧客廚師才會做菜,這都沒顧客怎么也做菜呢?看下源碼

public final Disposable subscribe() {return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer()); }public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe) {//省略判空//默認(rèn)的顧客LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);subscribe(ls);return ls; }public final void subscribe(Observer<? super T> observer) {subscribeActual(observer); }protected abstract void subscribeActual(Observer<? super T> observer); 復(fù)制代碼

原來系統(tǒng)會默認(rèn)創(chuàng)建一個 LambdaObserver(默認(rèn)顧客) ,服務(wù)員從廚師那端的菜會傳給這個顧客。所以可以看出廚師做不做菜只取決于飯店(Observable.subscribe),后面的流程和上面分析的一致。另外上面的代碼還出現(xiàn)了Consumer、Action類,這些類里也有對事件的處理,可以理解成顧客選擇接收服務(wù)員的哪些信息,在 functions 包下還有其他實現(xiàn)

subscribe() 有下面幾個重載方法:

//顧客只關(guān)心上什么菜 public final Disposable subscribe() {}public final Disposable subscribe(Consumer<? super T> onNext) {}//顧客關(guān)心上什么菜以及菜是不是出問題 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} //顧客關(guān)心上什么菜、菜是不是有問題、菜是不是上完了 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}//顧客關(guān)心上什么菜、菜是不是有問題、菜是不是上完了、 //onSubscribe()中可以獲取Disposable引用,之后選擇告訴服務(wù)員是否繼續(xù)上菜 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}//由顧客自己決定關(guān)心哪些事件,和上一條效果一樣 public final void subscribe(Observer<? super T> observer) {} 復(fù)制代碼

如果顧客只關(guān)心上什么菜,我們可以這么寫:

Consumer<String> consumer = new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("服務(wù)員端給顧客 " + s);} }; observable.subscribe(consumer); 復(fù)制代碼

打印如下:

服務(wù)員從廚師那取得 拌面 服務(wù)員端給顧客 拌面 服務(wù)員從廚師那取得 扁食 服務(wù)員端給顧客 扁食 服務(wù)員從廚師那取得 蒸餃 服務(wù)員端給顧客 蒸餃 廚師告知服務(wù)員菜上好了 復(fù)制代碼

總結(jié)

以上是生活随笔為你收集整理的图解RxJava2(一)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。