从 RxBus 这辆兰博基尼深入进去
又到周五啦,先祝大家周末愉快。
今天繼續發車,本篇文章來自 謝三弟 的投稿,從官方文檔以及源碼入手,介紹了使用RxJava實現RxBus的原理。最后,想容易得看懂本文,前提是需要對RxJava有一定了解。
謝三弟 的博客地址:
http://imxie.cc
很早之前有看過別人實現的 RxBus , 當初也只是隨意用用而已,沒有想過去研究。今天看到 brucezz 天哥在群里分享了一把,自己也加入了討論,下來還實踐了一把,所以想借此篇進入到源碼層,深刻體驗下 RxBus 這輛 “蘭博基尼” 的設計美感和獨特魅力。
準備
推薦先看看 RxBus 的簡單實現和用法,地址在這里:
RxBus的簡單實現
http://brucezz.itscoder.com/articles/2016/06/02/a-simple-rxbus-implementation
解剖
讓我們看看這輛車到底用了些什么?
Subject
SerializedSubject
PublishSubject
CompositeSubscription
官方解釋
這是 Subject 的中文解釋:
Subject 可以看成是一個橋梁或者代理,在某些 ReactiveX 實現中(如 RxJava),它同時充當了 Observer 和 Observable 的角色。因為它是一個Observer,它可以訂閱一個或多個 Observable;又因為它是一個Observable,它可以轉發它收到(Observe)的數據,也可以發射新的數據。
由于一個 Subject 訂閱一個 Observable,它可以觸發這個 Observable 開始發射數據(如果那個Observable是”冷”的–就是說,它等待有訂閱才開始發射數據)。因此有這樣的效果,Subject 可以把原來那個”冷”的 Observable 變成”熱”的。
Subject源碼
源碼:
Subject 只有兩個方法:
hasObservers() 方法的解釋是:
Indicates whether the {@link Subject} has {@link Observer Observers} subscribed to it.
判斷 Subject 是否已經有 observers 訂閱了 有則返回 true。
toSerialized()?方法的解釋是:
Wraps a {@link Subject} so that it is safe to call its various {@code on} methods from different threads.
包裝 Subject 后讓它可以安全的在不同線程中調用各種方法。
為什么調用這個方法后就可以是線程安全了呢?
我們看到?toSerialized()?返回了?SerializedSubject<T, R>?。我們先到這里打住,稍后我們再看看該類做了什么。
PublishSubject解釋
在 RxJava 里有一個抽象類 Subject,既是 Observable 又是 Observer,可以把 Subject 理解成一個管道或者轉發器,數據從一端輸入,然后從另一端輸出。
Subject 有好幾種,這里可以使用最簡單的?PublishSubject。訂閱之后,一旦數據從一端傳入,結果會里立刻從另一端輸出。
源碼里給了用法例子:
串行化
官方文檔推薦我們:
如果你把 Subject 當作一個 Subscriber 使用,注意不要從多個線程中調用它的 onNext方法(包括其它的on系列方法),這可能導致同時(非順序)調用,這會 違反Observable協議,給 Subject 的結果增加了不確定性。
要避免此類問題,你可以將 Subject 轉換為一個 SerializedSubject ,類似于這樣:
所以我們可以看到在 RxBus 初始化的時候我們做了這樣一件事情:
private RxBus() {
? ?BUS = new SerializedSubject<>(PublishSubject.create()); }
為了保證多線程調用中結果的確定性,我們按照官方推薦將 Subject 轉換成了一個 SerializedSubject 。
SerializedSubject
該類同樣是?Subject?的子類,這里貼出該類的構造方法。
我們發現,Subject?最后轉化成了?SerializedObserver。
SerializedObserver
When multiple threads are emitting and/or notifying they will be serialized by:
Allowing only one thread at a time to emit
Adding notifications to a queue if another thread is already emitting
Not holding any locks or blocking any threads while emitting
一次只會允許一個線程進行發送事物
如果其他線程已經準備就緒,會通知給隊列
在發送事物中,不會持有任何鎖和阻塞任何線程
通過介紹可以知道是通過?notifications?來進行并發處理的。
SerializedObserver 類中:
重點看看 nl 在 onNext() 方法里的使用:
NotificationLite
知道哪里具體調用了之后,我們再仔細看看?NotificationLite?。
先來了解它到底是什么:
For use in internal operators that need something like materialize and dematerialize wholly within the implementation of the operator but don’t want to incur the allocation cost of actually creating {@link rx.Notification} objects for every {@link Observer#onNext onNext} and {@link Observer#onCompleted onCompleted}.
It’s implemented as a singleton to maintain some semblance of type safety that is completely non-existent.
大致意思是:作為一個單例類保持這種完全不存在的安全類型的表象。
剛我們在 SerializedObserver 的 onNext() 方法中看到?nl.accept(actual, o)
所以我們再深入到?accept()?方法中:
Unwraps the lite notification and calls the appropriate method on the {@link Observer}.
判斷 lite 通知類別,通知 observer 執行適當方法。
通過 NotificationLite 類圖可以看到有三個標識:
ON_NEXT_NULL_SENTINEL (onNext 標識)
ON_COMPLETED_SENTINEL (onCompleted 標識)
OnErrorSentinel (onError 標識)
與 Observer 回調一致。通過分析得知?accept()?就是通過標識來判斷,然后調用 Observer 相對應的方法。
CompositeSubscription
RxBus 這輛”蘭博基尼”與 CompositeSubscription 車間搭配更好。
構造函數:
內部是初始化了一個 HashSet ,按照哈希算法來存取集合中的對象,存取速度比較快,并且沒有重復對象。
所以我們推薦在基類里實例化一個 CompositeSubscription 對象,使用 CompositeSubscription 來持有所有的 Subscriptions ,然后在 onDestroy()或者 onDestroyView() 里取消所有的訂閱。
參考文章
http://blog.csdn.net/lzyzsd/article/details/45033611
https://mcxiaoke.gitbooks.io/rxdocs/content/Subject.html
能力有限,文章錯誤還望指出,有任何問題都歡迎討論 :)
最后送上我的女神 Gakki (是投稿作者的,不是我的), 開心最好 ( ′?v `? )?。
如果你有好的技術文章想和大家分享,歡迎向我的公眾號投稿,投稿具體細節請在公眾號主頁點擊“投稿”菜單查看。
歡迎長按下圖?->?識別圖中二維碼或者掃一掃關注我的公眾號:
上車請投幣
贊賞
人贊賞
總結
以上是生活随笔為你收集整理的从 RxBus 这辆兰博基尼深入进去的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用JAVA算养鸡大户王大喜_养鸡大户王大
- 下一篇: Cutting Chains UVA -