Android RxJava 3.x 使用总结
轉(zhuǎn)載請(qǐng)標(biāo)明出處:http://blog.csdn.net/zhaoyanjun6/article/details/106720158
本文出自【趙彥軍的博客】
文章目錄
- 依賴接入
- Flowable
- Single
- Maybe
- BackpressureStrategy
- 線程切換
- concat
- 例子1
依賴接入
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0' implementation "io.reactivex.rxjava3:rxjava:3.0.4"Flowable
//java 方式 Flowable.just(1).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Throwable {}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Throwable {}});//或者用 Lambda 簡(jiǎn)寫 Flowable.just(1).subscribe( it -> {}, throwable -> {});range 一組序列數(shù)據(jù)
Flowable.range(0, 4).subscribe(it -> {//結(jié)果 0 1 2 3}, throwable -> {});Single
Single只發(fā)射單個(gè)數(shù)據(jù)或錯(cuò)誤事件,即使發(fā)射多個(gè)數(shù)據(jù),后面發(fā)射的數(shù)據(jù)也不會(huì)處理。
只有 onSuccess 和 onError事件,沒有 onNext 、onComplete事件。
SingleEmitter
public interface SingleEmitter<@NonNull T> {void onSuccess(@NonNull T t);void onError(@NonNull Throwable t);void setDisposable(@Nullable Disposable d);void setCancellable(@Nullable Cancellable c);boolean isDisposed();boolean tryOnError(@NonNull Throwable t);}示例1
Single.create(new SingleOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull SingleEmitter<Integer> emitter) throws Throwable {emitter.onSuccess(1);}}).subscribe(integer -> {}, throwable -> {});示例2
Single.just(1).subscribe(integer -> {}, throwable -> {});Maybe
Maybe 是 RxJava2.x 之后才有的新類型,可以看成是Single和Completable的結(jié)合。
Maybe 也只能發(fā)射單個(gè)事件或錯(cuò)誤事件,即使發(fā)射多個(gè)數(shù)據(jù),后面發(fā)射的數(shù)據(jù)也不會(huì)處理。
只有 onSuccess 、 onError 、onComplete事件,沒有 onNext 事件。
實(shí)例1
Maybe.create(new MaybeOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull MaybeEmitter<Integer> emitter) throws Throwable {emitter.onSuccess(1);emitter.onComplete();}}).subscribe(integer -> {}, throwable -> {});實(shí)例2
Maybe.just(1).subscribe(integer -> {}, throwable -> {});BackpressureStrategy
背壓策略
public enum BackpressureStrategy {/*** The {@code onNext} events are written without any buffering or dropping.* Downstream has to deal with any overflow.* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.*/MISSING,/*** Signals a {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException MissingBackpressureException}* in case the downstream can't keep up.*/ERROR,/*** Buffers <em>all</em> {@code onNext} values until the downstream consumes it.*/BUFFER,/*** Drops the most recent {@code onNext} value if the downstream can't keep up.*/DROP,/*** Keeps only the latest {@code onNext} value, overwriting any previous value if the* downstream can't keep up.*/LATEST }- MISSING 策略則表示通過 Create 方法創(chuàng)建的 Flowable 沒有指定背壓策略,不會(huì)對(duì)通過 OnNext 發(fā)射的數(shù)據(jù)做緩存或丟棄處理,需要下游通過背壓操作符
- BUFFER 策略則在還有數(shù)據(jù)未下發(fā)完成時(shí)就算上游調(diào)用onComplete或onError也會(huì)等待數(shù)據(jù)下發(fā)完成
- LATEST 策略則當(dāng)產(chǎn)生背壓時(shí)僅會(huì)緩存最新的數(shù)據(jù)
- DROP 策略為背壓時(shí)丟棄背壓數(shù)據(jù)
- ERROR 策略是背壓時(shí)拋出異常調(diào)用onError
線程切換
RxUtil
package com.example.streamimport io.reactivex.rxjava3.android.schedulers.AndroidSchedulers import io.reactivex.rxjava3.core.FlowableTransformer import io.reactivex.rxjava3.core.MaybeTransformer import io.reactivex.rxjava3.core.ObservableTransformer import io.reactivex.rxjava3.core.SingleTransformer import io.reactivex.rxjava3.schedulers.Schedulers/*** @author yanjun.zhao* @time 2020/6/12 8:39 PM* @desc*/object RxUtil {/*** 線程切換*/fun <T> maybeToMain(): MaybeTransformer<T, T> {return MaybeTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}/*** 線程切換*/fun <T> singleToMain(): SingleTransformer<T, T> {return SingleTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}/*** 線程切換*/fun <T> flowableToMain(): FlowableTransformer<T, T> {return FlowableTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}fun <T> observableToMain(): ObservableTransformer<T, T> {return ObservableTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}}具體實(shí)現(xiàn)
package com.example.streamimport android.os.Bundle import androidx.appcompat.app.AppCompatActivity import io.reactivex.rxjava3.core.Flowable import io.reactivex.rxjava3.core.Maybe import io.reactivex.rxjava3.core.Observable import io.reactivex.rxjava3.core.Singleclass MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)Single.just(1).map {//運(yùn)行在子線程it}.compose(RxUtil.singleToMain()) //線程轉(zhuǎn)換.subscribe({//運(yùn)行在主線程},{it.printStackTrace()})Maybe.just(1).map {//運(yùn)行在子線程it}.compose(RxUtil.maybeToMain()) //線程轉(zhuǎn)換.subscribe({//運(yùn)行在主線程},{it.printStackTrace()})Flowable.just(1).map {//運(yùn)行在子線程it}.compose(RxUtil.flowableToMain()) //線程轉(zhuǎn)換.subscribe({//運(yùn)行在主線程},{it.printStackTrace()})Observable.just(1).map {//運(yùn)行在子線程it}.compose(RxUtil.observableToMain()) //線程轉(zhuǎn)換.subscribe({ it ->//運(yùn)行在主線程},{it.printStackTrace()})} }concat
Concat操作符連接多個(gè)Observable的輸出,就好像它們是一個(gè)Observable,第一個(gè)Observable發(fā)射的所有數(shù)據(jù)在第二個(gè)Observable發(fā)射的任何數(shù)據(jù)前面,以此類推。
直到前面一個(gè)Observable終止,Concat才會(huì)訂閱額外的一個(gè)Observable。注意:因此,如果你嘗試連接一個(gè)"熱"Observable(這種Observable在創(chuàng)建后立即開始發(fā)射數(shù)據(jù),即使沒有訂閱者),Concat將不會(huì)看到也不會(huì)發(fā)射它之前發(fā)射的任何數(shù)據(jù)。
例子1
private var ob1 = Observable.create<String> {Log.d("concat-數(shù)據(jù)源1", " ${Thread.currentThread().name} ")it.onNext("a1")it.onComplete()}private var ob2 = Observable.create<String> {Log.d("concat-數(shù)據(jù)源2", " ${Thread.currentThread().name} ")it.onNext("a2")it.onComplete()}private var ob3 = Observable.create<String> {Log.d("concat-數(shù)據(jù)源3", " ${Thread.currentThread().name} ")it.onNext("a3")it.onComplete()}Observable.concat<String>(ob1, ob2, ob3).subscribeOn(Schedulers.io()).subscribe{Log.d("concat-結(jié)果", " ${Thread.currentThread().name} " + it)}結(jié)果是:
concat-數(shù)據(jù)源1: RxCachedThreadScheduler-1 concat-結(jié)果: RxCachedThreadScheduler-1 concat-數(shù)據(jù)源2: RxCachedThreadScheduler-1 concat-結(jié)果: RxCachedThreadScheduler-1 concat-數(shù)據(jù)源3: RxCachedThreadScheduler-1 concat-結(jié)果: RxCachedThreadScheduler-1結(jié)果分析:
- concat 輸出結(jié)果是有序的
- concat 會(huì)使三個(gè)數(shù)據(jù)源都會(huì)執(zhí)行
那么如果我要實(shí)現(xiàn)哪個(gè)數(shù)據(jù)源有數(shù)據(jù),我就用哪個(gè)數(shù)據(jù),一旦獲取到想要的數(shù)據(jù),后續(xù)數(shù)據(jù)源不再執(zhí)行。其實(shí)很簡(jiǎn)單,用 firstElement() ,這個(gè)需求有點(diǎn)像圖片加載流程 先從內(nèi)存取,內(nèi)存沒有從本地文件取,本都文件沒有就請(qǐng)求服務(wù)器。一旦哪個(gè)環(huán)節(jié)獲取到了數(shù)據(jù),立刻停止后面的流程
Observable.concat<String>(ob1, ob2, ob3).firstElement().subscribeOn(Schedulers.io()).subscribe {Log.d("concat-結(jié)果", " ${Thread.currentThread().name} ")}}運(yùn)行結(jié)果為:
concat-數(shù)據(jù)源1: RxCachedThreadScheduler-1 concat-結(jié)果: RxCachedThreadScheduler-1總結(jié)
以上是生活随笔為你收集整理的Android RxJava 3.x 使用总结的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kotlin实战指南十五:协程泄漏
- 下一篇: android sina oauth2.