日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > Android >内容正文

Android

Android RxJava 3.x 使用总结

發(fā)布時(shí)間:2024/9/30 Android 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Android RxJava 3.x 使用总结 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

轉(zhuǎn)載請(qǐng)標(biāo)明出處:http://blog.csdn.net/zhaoyanjun6/article/details/106720158
本文出自【趙彥軍的博客】

文章目錄

    • 依賴接入
    • Flowable
    • Single
    • Maybe
    • BackpressureStrategy
    • 線程切換
    • concat
      • 例子1

依賴接入

implementation 'io.reactivex.rxjava3:rxandroid:3.0.0' implementation "io.reactivex.rxjava3:rxjava:3.0.4"

Flowable

//java 方式 Flowable.just(1).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Throwable {}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Throwable {}});//或者用 Lambda 簡(jiǎn)寫 Flowable.just(1).subscribe( it -> {}, throwable -> {});

range 一組序列數(shù)據(jù)

Flowable.range(0, 4).subscribe(it -> {//結(jié)果 0 1 2 3}, throwable -> {});

Single

Single只發(fā)射單個(gè)數(shù)據(jù)或錯(cuò)誤事件,即使發(fā)射多個(gè)數(shù)據(jù),后面發(fā)射的數(shù)據(jù)也不會(huì)處理。
只有 onSuccess 和 onError事件,沒有 onNext 、onComplete事件。

SingleEmitter

public interface SingleEmitter<@NonNull T> {void onSuccess(@NonNull T t);void onError(@NonNull Throwable t);void setDisposable(@Nullable Disposable d);void setCancellable(@Nullable Cancellable c);boolean isDisposed();boolean tryOnError(@NonNull Throwable t);}

示例1

Single.create(new SingleOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull SingleEmitter<Integer> emitter) throws Throwable {emitter.onSuccess(1);}}).subscribe(integer -> {}, throwable -> {});

示例2

Single.just(1).subscribe(integer -> {}, throwable -> {});

Maybe

Maybe 是 RxJava2.x 之后才有的新類型,可以看成是Single和Completable的結(jié)合。
Maybe 也只能發(fā)射單個(gè)事件或錯(cuò)誤事件,即使發(fā)射多個(gè)數(shù)據(jù),后面發(fā)射的數(shù)據(jù)也不會(huì)處理。
只有 onSuccess 、 onError 、onComplete事件,沒有 onNext 事件。

public interface MaybeEmitter<@NonNull T> {void onSuccess(@NonNull T t);void onError(@NonNull Throwable t);void onComplete();void setDisposable(@Nullable Disposable d);void setCancellable(@Nullable Cancellable c);boolean isDisposed();boolean tryOnError(@NonNull Throwable t);}

實(shí)例1

Maybe.create(new MaybeOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull MaybeEmitter<Integer> emitter) throws Throwable {emitter.onSuccess(1);emitter.onComplete();}}).subscribe(integer -> {}, throwable -> {});

實(shí)例2

Maybe.just(1).subscribe(integer -> {}, throwable -> {});

BackpressureStrategy

背壓策略

public enum BackpressureStrategy {/*** The {@code onNext} events are written without any buffering or dropping.* Downstream has to deal with any overflow.* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.*/MISSING,/*** Signals a {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException MissingBackpressureException}* in case the downstream can't keep up.*/ERROR,/*** Buffers <em>all</em> {@code onNext} values until the downstream consumes it.*/BUFFER,/*** Drops the most recent {@code onNext} value if the downstream can't keep up.*/DROP,/*** Keeps only the latest {@code onNext} value, overwriting any previous value if the* downstream can't keep up.*/LATEST }
  • MISSING 策略則表示通過 Create 方法創(chuàng)建的 Flowable 沒有指定背壓策略,不會(huì)對(duì)通過 OnNext 發(fā)射的數(shù)據(jù)做緩存或丟棄處理,需要下游通過背壓操作符
  • BUFFER 策略則在還有數(shù)據(jù)未下發(fā)完成時(shí)就算上游調(diào)用onComplete或onError也會(huì)等待數(shù)據(jù)下發(fā)完成
  • LATEST 策略則當(dāng)產(chǎn)生背壓時(shí)僅會(huì)緩存最新的數(shù)據(jù)
  • DROP 策略為背壓時(shí)丟棄背壓數(shù)據(jù)
  • ERROR 策略是背壓時(shí)拋出異常調(diào)用onError
Flowable.create(new FlowableOnSubscribe<Long>() {@Overridepublic void subscribe(@NonNull FlowableEmitter<Long> emitter) throws Throwable {emitter.onNext(1L);emitter.onNext(2L);emitter.onComplete();}}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(it -> {}, throwable -> {});

線程切換

RxUtil

package com.example.streamimport io.reactivex.rxjava3.android.schedulers.AndroidSchedulers import io.reactivex.rxjava3.core.FlowableTransformer import io.reactivex.rxjava3.core.MaybeTransformer import io.reactivex.rxjava3.core.ObservableTransformer import io.reactivex.rxjava3.core.SingleTransformer import io.reactivex.rxjava3.schedulers.Schedulers/*** @author yanjun.zhao* @time 2020/6/12 8:39 PM* @desc*/object RxUtil {/*** 線程切換*/fun <T> maybeToMain(): MaybeTransformer<T, T> {return MaybeTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}/*** 線程切換*/fun <T> singleToMain(): SingleTransformer<T, T> {return SingleTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}/*** 線程切換*/fun <T> flowableToMain(): FlowableTransformer<T, T> {return FlowableTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}fun <T> observableToMain(): ObservableTransformer<T, T> {return ObservableTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}}

具體實(shí)現(xiàn)

package com.example.streamimport android.os.Bundle import androidx.appcompat.app.AppCompatActivity import io.reactivex.rxjava3.core.Flowable import io.reactivex.rxjava3.core.Maybe import io.reactivex.rxjava3.core.Observable import io.reactivex.rxjava3.core.Singleclass MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)Single.just(1).map {//運(yùn)行在子線程it}.compose(RxUtil.singleToMain()) //線程轉(zhuǎn)換.subscribe({//運(yùn)行在主線程},{it.printStackTrace()})Maybe.just(1).map {//運(yùn)行在子線程it}.compose(RxUtil.maybeToMain()) //線程轉(zhuǎn)換.subscribe({//運(yùn)行在主線程},{it.printStackTrace()})Flowable.just(1).map {//運(yùn)行在子線程it}.compose(RxUtil.flowableToMain()) //線程轉(zhuǎn)換.subscribe({//運(yùn)行在主線程},{it.printStackTrace()})Observable.just(1).map {//運(yùn)行在子線程it}.compose(RxUtil.observableToMain()) //線程轉(zhuǎn)換.subscribe({ it ->//運(yùn)行在主線程},{it.printStackTrace()})} }

concat

Concat操作符連接多個(gè)Observable的輸出,就好像它們是一個(gè)Observable,第一個(gè)Observable發(fā)射的所有數(shù)據(jù)在第二個(gè)Observable發(fā)射的任何數(shù)據(jù)前面,以此類推。

直到前面一個(gè)Observable終止,Concat才會(huì)訂閱額外的一個(gè)Observable。注意:因此,如果你嘗試連接一個(gè)"熱"Observable(這種Observable在創(chuàng)建后立即開始發(fā)射數(shù)據(jù),即使沒有訂閱者),Concat將不會(huì)看到也不會(huì)發(fā)射它之前發(fā)射的任何數(shù)據(jù)。

例子1

private var ob1 = Observable.create<String> {Log.d("concat-數(shù)據(jù)源1", " ${Thread.currentThread().name} ")it.onNext("a1")it.onComplete()}private var ob2 = Observable.create<String> {Log.d("concat-數(shù)據(jù)源2", " ${Thread.currentThread().name} ")it.onNext("a2")it.onComplete()}private var ob3 = Observable.create<String> {Log.d("concat-數(shù)據(jù)源3", " ${Thread.currentThread().name} ")it.onNext("a3")it.onComplete()}Observable.concat<String>(ob1, ob2, ob3).subscribeOn(Schedulers.io()).subscribe{Log.d("concat-結(jié)果", " ${Thread.currentThread().name} " + it)}

結(jié)果是:

concat-數(shù)據(jù)源1: RxCachedThreadScheduler-1 concat-結(jié)果: RxCachedThreadScheduler-1 concat-數(shù)據(jù)源2: RxCachedThreadScheduler-1 concat-結(jié)果: RxCachedThreadScheduler-1 concat-數(shù)據(jù)源3: RxCachedThreadScheduler-1 concat-結(jié)果: RxCachedThreadScheduler-1

結(jié)果分析:

  • concat 輸出結(jié)果是有序的
  • concat 會(huì)使三個(gè)數(shù)據(jù)源都會(huì)執(zhí)行

那么如果我要實(shí)現(xiàn)哪個(gè)數(shù)據(jù)源有數(shù)據(jù),我就用哪個(gè)數(shù)據(jù),一旦獲取到想要的數(shù)據(jù),后續(xù)數(shù)據(jù)源不再執(zhí)行。其實(shí)很簡(jiǎn)單,用 firstElement() ,這個(gè)需求有點(diǎn)像圖片加載流程 先從內(nèi)存取,內(nèi)存沒有從本地文件取,本都文件沒有就請(qǐng)求服務(wù)器。一旦哪個(gè)環(huán)節(jié)獲取到了數(shù)據(jù),立刻停止后面的流程

Observable.concat<String>(ob1, ob2, ob3).firstElement().subscribeOn(Schedulers.io()).subscribe {Log.d("concat-結(jié)果", " ${Thread.currentThread().name} ")}}

運(yùn)行結(jié)果為:

concat-數(shù)據(jù)源1: RxCachedThreadScheduler-1 concat-結(jié)果: RxCachedThreadScheduler-1

總結(jié)

以上是生活随笔為你收集整理的Android RxJava 3.x 使用总结的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。