日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

从 RxBus 这辆兰博基尼深入进去

發布時間:2023/12/14 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 从 RxBus 这辆兰博基尼深入进去 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.


又到周五啦,先祝大家周末愉快。


今天繼續發車,本篇文章來自 謝三弟 的投稿,從官方文檔以及源碼入手,介紹了使用RxJava實現RxBus的原理。最后,想容易得看懂本文,前提是需要對RxJava有一定了解。


謝三弟 的博客地址:

http://imxie.cc


序言


很早之前有看過別人實現的 RxBus , 當初也只是隨意用用而已,沒有想過去研究。今天看到 brucezz 天哥在群里分享了一把,自己也加入了討論,下來還實踐了一把,所以想借此篇進入到源碼層,深刻體驗下 RxBus 這輛 “蘭博基尼” 的設計美感和獨特魅力。


RxBus


準備


推薦先看看 RxBus 的簡單實現和用法,地址在這里:


RxBus的簡單實現

http://brucezz.itscoder.com/articles/2016/06/02/a-simple-rxbus-implementation


解剖




讓我們看看這輛車到底用了些什么?


  • Subject

  • SerializedSubject

  • PublishSubject

  • CompositeSubscription


從Subject開始發車


官方解釋


這是 Subject 的中文解釋:


Subject 可以看成是一個橋梁或者代理,在某些 ReactiveX 實現中(如 RxJava),它同時充當了 Observer Observable 的角色。因為它是一個Observer,它可以訂閱一個或多個 Observable;又因為它是一個Observable,它可以轉發它收到(Observe)的數據,也可以發射新的數據。


由于一個 Subject 訂閱一個 Observable,它可以觸發這個 Observable 開始發射數據(如果那個Observable是”冷”的–就是說,它等待有訂閱才開始發射數據)。因此有這樣的效果,Subject 可以把原來那個”冷”的 Observable 變成”熱”的。


Subject源碼


源碼:




Subject 只有兩個方法:


hasObservers() 方法的解釋是:


Indicates whether the {@link Subject} has {@link Observer Observers} subscribed to it.

判斷 Subject 是否已經有 observers 訂閱了 有則返回 true


toSerialized()?方法的解釋是:


Wraps a {@link Subject} so that it is safe to call its various {@code on} methods from different threads.

包裝 Subject 后讓它可以安全的在不同線程中調用各種方法。


為什么調用這個方法后就可以是線程安全了呢?


我們看到?toSerialized()?返回了?SerializedSubject<T, R>?。我們先到這里打住,稍后我們再看看該類做了什么。


PublishSubject解釋




RxJava 里有一個抽象類 Subject,既是 Observable 又是 Observer,可以把 Subject 理解成一個管道或者轉發器,數據從一端輸入,然后從另一端輸出。


Subject 有好幾種,這里可以使用最簡單的?PublishSubject。訂閱之后,一旦數據從一端傳入,結果會里立刻從另一端輸出。


源碼里給了用法例子:




串行化


官方文檔推薦我們:


如果你把 Subject 當作一個 Subscriber 使用,注意不要從多個線程中調用它的 onNext方法包括其它的on系列方法),這可能導致同時(非順序)調用,這會 違反Observable協議,給 Subject 的結果增加了不確定性。


要避免此類問題,你可以將 Subject 轉換為一個 SerializedSubject ,類似于這樣:


mySafeSubject = new SerializedSubject(myUnsafeSubject);


所以我們可以看到在 RxBus 初始化的時候我們做了這樣一件事情:


private final Subject<Object, Object> BUS;
private RxBus() {
? ?BUS = new SerializedSubject<>(PublishSubject.create()); }


為了保證多線程調用中結果的確定性,我們按照官方推薦將 Subject 轉換成了一個 SerializedSubject 。


SerializedSubject




該類同樣是?Subject?的子類,這里貼出該類的構造方法。




我們發現,Subject?最后轉化成了?SerializedObserver。


SerializedObserver


When multiple threads are emitting and/or notifying they will be serialized by:

Allowing only one thread at a time to emit

Adding notifications to a queue if another thread is already emitting

Not holding any locks or blocking any threads while emitting


一次只會允許一個線程進行發送事物
如果其他線程已經準備就緒,會通知給隊列
在發送事物中,不會持有任何鎖和阻塞任何線程




通過介紹可以知道是通過?notifications?來進行并發處理的。


SerializedObserver 類中:


private final NotificationLite<T> nl = NotificationLite.instance();


重點看看 nl onNext() 方法里的使用:




NotificationLite


知道哪里具體調用了之后,我們再仔細看看?NotificationLite?。


先來了解它到底是什么:


For use in internal operators that need something like materialize and dematerialize wholly within the implementation of the operator but don’t want to incur the allocation cost of actually creating {@link rx.Notification} objects for every {@link Observer#onNext onNext} and {@link Observer#onCompleted onCompleted}.

It’s implemented as a singleton to maintain some semblance of type safety that is completely non-existent.


大致意思是:作為一個單例類保持這種完全不存在的安全類型的表象。




剛我們在 SerializedObserver onNext() 方法中看到?nl.accept(actual, o)


所以我們再深入到?accept()?方法中:




Unwraps the lite notification and calls the appropriate method on the {@link Observer}.

判斷 lite 通知類別,通知 observer 執行適當方法。


通過 NotificationLite 類圖可以看到有三個標識:


  • ON_NEXT_NULL_SENTINEL (onNext 標識)

  • ON_COMPLETED_SENTINEL (onCompleted 標識)

  • OnErrorSentinel (onError 標識)


Observer 回調一致。通過分析得知?accept()?就是通過標識來判斷,然后調用 Observer 相對應的方法。


CompositeSubscription


RxBus 這輛”蘭博基尼”與 CompositeSubscription 車間搭配更好。




構造函數:




內部是初始化了一個 HashSet ,按照哈希算法來存取集合中的對象,存取速度比較快,并且沒有重復對象。


所以我們推薦在基類里實例化一個 CompositeSubscription 對象,使用 CompositeSubscription 來持有所有的 Subscriptions ,然后在 onDestroy()或者 onDestroyView() 里取消所有的訂閱。


參考文章


http://blog.csdn.net/lzyzsd/article/details/45033611


https://mcxiaoke.gitbooks.io/rxdocs/content/Subject.html


熄火休息


能力有限,文章錯誤還望指出,有任何問題都歡迎討論 :)


最后送上我的女神 Gakki (是投稿作者的,不是我的), 開心最好 ( ′?v `? )?。







如果你有好的技術文章想和大家分享,歡迎向我的公眾號投稿,投稿具體細節請在公眾號主頁點擊“投稿”菜單查看。


歡迎長按下圖?->?識別圖中二維碼或者掃一掃關注我的公眾號:


上車請投幣

贊賞

人贊賞

總結

以上是生活随笔為你收集整理的从 RxBus 这辆兰博基尼深入进去的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。