日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > java >内容正文

java

retrofit content-length为0_大佬们,一波RxJava 3.0来袭,请做好准备~

發(fā)布時(shí)間:2024/10/6 java 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 retrofit content-length为0_大佬们,一波RxJava 3.0来袭,请做好准备~ 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

本文作者

作者:新小夢(mèng)

鏈接:

https://juejin.im/post/5d1eeffe6fb9a07f0870b4e8

本文由作者授權(quán)發(fā)布。

0前言

每個(gè)Android開發(fā)者,都是愛RxJava的,簡(jiǎn)潔線程切換和多網(wǎng)絡(luò)請(qǐng)求合并,再配合Retrofit,簡(jiǎn)直是APP開發(fā)的福音。

不知不覺,RxJava一路走來,已經(jīng)更新到第三大版本了。

不像RxJava 2對(duì)RxJava 1那么殘忍,RxJava 3對(duì)RxJava 2的兼容性還是挺好的,目前并沒有做出很大的更改。RxJava2到2020年12月31號(hào)不再提供支持,錯(cuò)誤的會(huì)同時(shí)在2.x和3.x修復(fù),但新功能只會(huì)在3.x上添加。

作為嘗鮮,趕緊品嘗吧。

1主要變化

主要特點(diǎn)

  • 單一依賴:Reactive-Streams

  • 繼續(xù)支持Java 6+和Android 2.3+

  • 修復(fù)了API錯(cuò)誤和RxJava 2的許多限制

  • 旨在替代RxJava 2,具有相對(duì)較少的二進(jìn)制不兼容更改

  • 提供Java 8 lambda友好的API

  • 關(guān)于并發(fā)源的不同意見

  • 異步或同步執(zhí)行

  • 參數(shù)化并發(fā)的虛擬時(shí)間和調(diào)度程序

  • 為測(cè)試schedulers,consumers和plugin hooks提供測(cè)試和診斷支持

與RxJava 2的主要區(qū)別是:

  • 將eagerTruncate添加到replay運(yùn)算符,以便head節(jié)點(diǎn)將在截?cái)鄷r(shí)丟失它保留的項(xiàng)引用

  • 新增 X.fromSupplier()

  • 使用 Scheduler 添加 concatMap,保證 mapper 函數(shù)的運(yùn)行位置

  • 新增 startWithItem 和 startWithIterable

  • ConnectableFlowable/ConnetableFlowable 重新設(shè)計(jì)

  • 將 as() 并入 to()

  • 更改 Maybe.defaultIfEmpty() 以返回 Single

  • 用 Supplier 代替 Callable

  • 將一些實(shí)驗(yàn)操作符推廣到標(biāo)準(zhǔn)

  • 從某些主題/處理器中刪除 getValues()

  • 刪除 replay(Scheduler) 及其重載

  • 刪除 dematerialize()

  • 刪除 startWith(T|Iterable)

  • 刪除 as()

  • 刪除 Maybe.toSingle(T)

  • 刪除 Flowable.subscribe(4 args)

  • 刪除 Observable.subscribe(4 args)

  • 刪除 Single.toCompletable()

  • 刪除 Completable.blockingGet()

到這里就結(jié)束了,想知道的都知道了。2入門

1、添加依賴

implementation?"io.reactivex.rxjava3:rxjava:3.0.0-RC0"

2、一些概念

2.1、上流、下流

在RxJava,數(shù)據(jù)以流的方式組織。也就是說,Rxjava包括一個(gè)源的數(shù)據(jù)流,數(shù)據(jù)流后跟著消費(fèi)者的零個(gè)到多個(gè)消費(fèi)數(shù)據(jù)流步驟。

source
??.operator1()
??.operator2()
??.operator3()
??.subscribe(consumer)

在上文代碼中,對(duì)于operator2來說,在它前面叫做上流,在它后面的叫做下流。憋住,別笑,真的是下流來的。

2.2、流的對(duì)象

在RxJava的文檔中,emission, emits, item, event, signal, data and message都被認(rèn)為在數(shù)據(jù)流中被傳遞的數(shù)據(jù)對(duì)象。

2.3、背壓(Backpressure)

當(dāng)數(shù)據(jù)流通過異步的步驟執(zhí)行時(shí),這些步驟的執(zhí)行速度可能不一致。也就是說上流數(shù)據(jù)發(fā)送太快,下流沒有足夠的能力去處理。為了避免這種情況,一般要么緩存上流的數(shù)據(jù),要么拋棄數(shù)據(jù)。但這種處理方式,有時(shí)會(huì)帶來很大的問題。

為此,RxJava帶來了backpressure的概念。背壓是一種流量的控制步驟,在不知道上流還有多少數(shù)據(jù)的情形下控制內(nèi)存的使用,表示它們還能處理多少數(shù)據(jù)。

支持背壓的有Flowable類,不支持背壓的有Observable,Single, Maybe and Completable類。

2.4 線程調(diào)度器(Schedulers)

對(duì)于我們Android開發(fā)來說,最喜歡的就是它簡(jiǎn)潔切換線程的操作。RxJava通過調(diào)度器來方便線程的切換。

  • Schedulers.computation(): 適合運(yùn)行在密集計(jì)算的操作,大多數(shù)異步操作符使用該調(diào)度器。

  • Schedulers.io():適合運(yùn)行I/0和阻塞操作.

  • Schedulers.single():適合需要單一線程的操作

  • Schedulers.trampoline(): 適合需要順序運(yùn)行的操作

在不同平臺(tái)還有不同的調(diào)度器,例如Android的主線程:AndroidSchedulers.mainThread()

Flowable.range(1,?10)
??.observeOn(Schedulers.computation())
??.map(v?->?v?*?v)
??.blockingSubscribe(System.out::println);

2.5 基類

在 RxJava 3 可以發(fā)現(xiàn)有以下幾個(gè)基類(跟RxJava 2是一致的吧):

  • io.reactivex.Flowable:發(fā)送0個(gè)N個(gè)的數(shù)據(jù),支持Reactive-Streams和背壓

  • io.reactivex.Observable:發(fā)送0個(gè)N個(gè)的數(shù)據(jù),不支持背壓,

  • io.reactivex.Single:只能發(fā)送單個(gè)數(shù)據(jù)或者一個(gè)錯(cuò)誤

  • io.reactivex.Completable:沒有發(fā)送任何數(shù)據(jù),但只處理 onComplete 和 onError 事件。

  • io.reactivex.Maybe:能夠發(fā)射0或者1個(gè)數(shù)據(jù),要么成功,要么失敗。

不建議再往下看了,建議點(diǎn)贊或收藏...

下文關(guān)于操作符內(nèi)容太多了。

等需要了,再來查閱

下班時(shí)間還是好好護(hù)發(fā)吧

https://github.com/GitCode8/GitCode/blob/master/README.md

3操作符:實(shí)用操作符

1、ObserveOn

指定觀察者的線程,例如在Android訪問網(wǎng)絡(luò)后,數(shù)據(jù)需要主線程消費(fèi),那么將觀察者的線程切換到主線就需要ObserveOn操作符。每次指定一次都會(huì)生效。

2、subscribeOn

指定被觀察者的線程,即數(shù)據(jù)源發(fā)生的線程。例如在Android訪問網(wǎng)絡(luò)時(shí),需要將線程切換到子線程。多次指定只有第一次有效。

3、doOnEach

數(shù)據(jù)源(Observable)每發(fā)送一次數(shù)據(jù),就調(diào)用一次。

4、doOnNext

數(shù)據(jù)源每次調(diào)用onNext() 之前都會(huì)先回調(diào)該方法。

5、doOnError

數(shù)據(jù)源每次調(diào)用onError() 之前會(huì)回調(diào)該方法。

6、doOnComplete

數(shù)據(jù)源每次調(diào)用onComplete() 之前會(huì)回調(diào)該方法

7、doOnSubscribe

數(shù)據(jù)源每次調(diào)用onSubscribe() 之后會(huì)回調(diào)該方法

8、doOnDispose

數(shù)據(jù)源每次調(diào)用dispose() 之后會(huì)回調(diào)該方法

其他的見官網(wǎng)吧,不難

https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators

對(duì)數(shù)據(jù)源過濾操作符

主要講對(duì)數(shù)據(jù)源進(jìn)行選擇和過濾的常用操作符

1、skip(跳過)

可以作用于Flowable,Observable,表示源發(fā)射數(shù)據(jù)前,跳過多少個(gè)。例如下面跳過前四個(gè):

Observable?source?=?Observable.just(1,?2,?3,?4,?5,?6,?7,?8,?9,?10);
source.skip(4)
????.subscribe(System.out::print);
打印結(jié)果:5678910
Observable?source?=?Observable.just(1,?2,?3,?4,?5,?6,?7,?8,?9,?10);
source.skipLast(4)
????.subscribe(System.out::print);
打印結(jié)果:1 2 3 4 5 6

skipLast(n)操作表示從流的尾部跳過n個(gè)元素。

2、debounce(去抖動(dòng))

可作用于Flowable,Observable。在Android開發(fā),通常為了防止用戶重復(fù)點(diǎn)擊而設(shè)置標(biāo)記位,而通過RxJava的debounce操作符可以有效達(dá)到該效果。在規(guī)定時(shí)間內(nèi),用戶重復(fù)點(diǎn)擊只有最后一次有效,

Observable?source?=?Observable.create(emitter?->?{
????emitter.onNext("A");
????Thread.sleep(1_500);
????emitter.onNext("B");
????Thread.sleep(500);
????emitter.onNext("C");
????Thread.sleep(250);
????emitter.onNext("D");
????Thread.sleep(2_000);
????emitter.onNext("E");
????emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
????????.debounce(1,?TimeUnit.SECONDS)
????????.blockingSubscribe(
????????????????item?->?System.out.print(item+"?"),
????????????????Throwable::printStackTrace,
????????????????()?->?System.out.println("onComplete"));
打印:A D E onComplete

上文代碼中,數(shù)據(jù)源以一定的時(shí)間間隔發(fā)送A,B,C,D,E。操作符debounce的時(shí)間設(shè)為1秒,發(fā)送A后1.5秒并沒有發(fā)射其他數(shù)據(jù),所以A能成功發(fā)射。

發(fā)射B后,在1秒之內(nèi),又發(fā)射了C和D,在D之后的2秒才發(fā)射E,所有B、C都失效,只有D有效;而E之后已經(jīng)沒有其他數(shù)據(jù)流了,所有E有效。

3、distinct(去重)

可作用于Flowable,Observable,去掉數(shù)據(jù)源重復(fù)的數(shù)據(jù)。

Observable.just(2,?3,?4,?4,?2,?1)
????????.distinct()
????????.subscribe(System.out::print);

//?打印:2?3?4?1
Observable.just(1,?1,?2,?1,?2,?3,?3,?4)
????????.distinctUntilChanged()
????????.subscribe(System.out::print);
//打印:1 2 1 2 3 4

distinctUntilChanged()去掉相鄰重復(fù)數(shù)據(jù)。

4、elementAt(獲取指定位置元素)

可作用于Flowable,Observable,從數(shù)據(jù)源獲取指定位置的元素,從0開始。

?Observable.just(2,4,3,1,5,8)
????????.elementAt(0)
????????.subscribe(integer?->?
?????????Log.d("TAG","elmentAt->"+integer));
打印:2

Observable<String>?source?=?Observable.just("Kirk",?"Spock",?"Chekov",?"Sulu");
Single<String>?element?=?source.elementAtOrError(4);

element.subscribe(
????name?->?System.out.println("onSuccess?will?not?be?printed!"),
????error?->?System.out.println("onError:?"?+?error));
打印:onSuccess will not?be?printed!

elementAtOrError:指定元素的位置超過數(shù)據(jù)長(zhǎng)度,則發(fā)射異常。

5、filter(過濾)

可作用于 Flowable,Observable,Maybe,Single。在filter中返回表示發(fā)射該元素,返回false表示過濾該數(shù)據(jù)。

Observable.just(1,?2,?3,?4,?5,?6)
????????.filter(x?->?x?%?2?==?0)
????????.subscribe(System.out::print);
打印:2?4?6

6、first(第一個(gè))

作用于 Flowable,Observable。發(fā)射數(shù)據(jù)源第一個(gè)數(shù)據(jù),如果沒有則發(fā)送默認(rèn)值。

Observable<String>?source?=?Observable.just("A",?"B",?"C");
Single<String>?firstOrDefault?=?source.first("D");
firstOrDefault.subscribe(System.out::println);
打印:A

Observable<String>?emptySource?=?Observable.empty();
Single<String>?firstOrError?=?emptySource.firstOrError();
firstOrError.subscribe(
????????element?->?System.out.println("onSuccess?will?not?be?printed!"),
????????error?->?System.out.println("onError:?"?+?error));
打印:onError: java.util.NoSuchElementException

和firstElement的區(qū)別是first返回的是Single,而firstElement返回Maybe。firstOrError在沒有數(shù)據(jù)會(huì)返回異常。

7、last(最后一個(gè))

last、lastElement、lastOrError與fist、firstElement、firstOrError相對(duì)應(yīng)。

Observable<String>?source?=?Observable.just("A",?"B",?"C");
Single<String>?lastOrDefault?=?source.last("D");
lastOrDefault.subscribe(System.out::println);
//打印:C

Observable<String>?source?=?Observable.just("A",?"B",?"C");
Maybe<String>?last?=?source.lastElement();
last.subscribe(System.out::println);
//打印:C

Observable<String>?emptySource?=?Observable.empty();
Single<String>?lastOrError?=?emptySource.lastOrError();
lastOrError.subscribe(
????????element?->?System.out.println("onSuccess?will?not?be?printed!"),
????????error?->?System.out.println("onError:?"?+?error));
//?打印:onError: java.util.NoSuchElementException

8、ignoreElements & ignoreElement(忽略元素)

ignoreElements 作用于Flowable、Observable。ignoreElement作用于Maybe、Single。兩者都是忽略掉數(shù)據(jù),返回完成或者錯(cuò)誤時(shí)間。

Single?source?=?Single.timer(1,?TimeUnit.SECONDS);
Completable?completable?=?source.ignoreElement();
completable.doOnComplete(()?->?System.out.println("Done!"))
????????.blockingAwait();//?1秒后打印:Donde!
Observable?source?=?Observable.intervalRange(1,?5,?1,?1,?TimeUnit.SECONDS);
Completable?completable?=?source.ignoreElements();
completable.doOnComplete(()?->?System.out.println("Done!"))
????????.blockingAwait();//?五秒后打印:Done!

9、ofType(過濾掉類型)

作用于Flowable、Observable、Maybe、過濾掉類型。

Observable?numbers?=?Observable.just(1,?4.0,?3,?2.71,?2f,?7);
Observable?integers?=?numbers.ofType(Integer.class);
integers.subscribe((Integer?x)?->?System.out.print(x+"?"));//打印:1?3?7

10、sample

作用于Flowable、Observable,在一個(gè)周期內(nèi)發(fā)射最新的數(shù)據(jù)。

Observable?source?=?Observable.create(emitter?->?{
????emitter.onNext("A");
????Thread.sleep(500);
????emitter.onNext("B");
????Thread.sleep(200);
????emitter.onNext("C");
????Thread.sleep(800);
????emitter.onNext("D");
????Thread.sleep(600);
????emitter.onNext("E");
????emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
????????.sample(1,?TimeUnit.SECONDS)
????????.blockingSubscribe(
????????????????item?->?System.out.print(item+"?"),
????????????????Throwable::printStackTrace,
????????????????()?->?System.out.print("onComplete"));//?打印:C D onComplete

與debounce的區(qū)別是,sample是以時(shí)間為周期的發(fā)射,一秒又一秒內(nèi)的最新數(shù)據(jù)。而debounce是最后一個(gè)有效數(shù)據(jù)開始。

11、throttleFirst & throttleLast & throttleWithTimeout

作用于Flowable、Observable。throttleLast與smaple一致,而throttleFirst是指定周期內(nèi)第一個(gè)數(shù)據(jù)。throttleWithTimeout與debounce一致。

Observable?source?=?Observable.create(emitter?->?{
????emitter.onNext("A");
????Thread.sleep(500);
????emitter.onNext("B");
????Thread.sleep(200);
????emitter.onNext("C");
????Thread.sleep(800);
????emitter.onNext("D");
????Thread.sleep(600);
????emitter.onNext("E");
????emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
????????.throttleFirst(1,?TimeUnit.SECONDS)
????????.blockingSubscribe(
????????????????item?->?System.out.print(item+"?"),
????????????????Throwable::printStackTrace,
????????????????()?->?System.out.print("?onComplete"));//打印:A?D?onComplete
source.subscribeOn(Schedulers.io())
????????.throttleLast(1,?TimeUnit.SECONDS)
????????.blockingSubscribe(
????????????????item?->?System.out.print(item+"?"),
????????????????Throwable::printStackTrace,
????????????????()?->?System.out.print("?onComplete"));//?打印:C?D?onComplete

12、throttleLatest

之所以拿出來單獨(dú)說,我看不懂官網(wǎng)的解釋。然后看別人的文章:throttleFirst+throttleLast的組合?開玩笑的吧。個(gè)人理解是:如果源的第一個(gè)數(shù)據(jù)總會(huì)被發(fā)射,然后開始周期計(jì)時(shí),此時(shí)的效果就會(huì)跟throttleLast一致。

Observable?source?=?Observable.create(emitter?->?{
????????????emitter.onNext("A");
????????????Thread.sleep(500);
????????????emitter.onNext("B");
????????????Thread.sleep(200);
????????????emitter.onNext("C");
????????????Thread.sleep(200);
????????????emitter.onNext("D");
????????????Thread.sleep(400);
????????????emitter.onNext("E");
????????????Thread.sleep(400);
????????????emitter.onNext("F");
????????????Thread.sleep(400);
????????????emitter.onNext("G");
????????????Thread.sleep(2000);
????????????emitter.onComplete();
????????});
????????source.subscribeOn(Schedulers.io())
????????.throttleLatest(1,?TimeUnit.SECONDS)
????????.blockingSubscribe(
????????????item?->?Log.e("RxJava",item),
?????????????????Throwable::printStackTrace,
????????????()?->?Log.e("RxJava","finished"));

打印結(jié)果:

13、take & takeLast

作用于Flowable、Observable,take發(fā)射前n個(gè)元素;takeLast發(fā)射后n個(gè)元素。

Observable?source?=?Observable.just(1,?2,?3,?4,?5,?6,?7,?8,?9,?10);
source.take(4)
????.subscribe(System.out::print);//打印:1?2?3?4
source.takeLast(4)
????.subscribe(System.out::println);//打印:7?8?9?10

14、timeout(超時(shí))

作用于Flowable、Observable、Maybe、Single、Completabl。后一個(gè)數(shù)據(jù)發(fā)射未在前一個(gè)元素發(fā)射后規(guī)定時(shí)間內(nèi)發(fā)射則返回超時(shí)異常。

Observable?source?=?Observable.create(emitter?->?{
????emitter.onNext("A");
????Thread.sleep(800);
????emitter.onNext("B");
????Thread.sleep(400);
????emitter.onNext("C");
????Thread.sleep(1200);
????emitter.onNext("D");
????emitter.onComplete();
});
source.timeout(1,?TimeUnit.SECONDS)
????????.subscribe(
????????????????item?->?System.out.println("onNext:?"?+?item),
????????????????error?->?System.out.println("onError:?"?+?error),
????????????????()?->?System.out.println("onComplete?will?not?be?printed!"));//?打印://?onNext:?A//?onNext:?B//?onNext:?C//?onError:?java.util.concurrent.TimeoutException:?
????????????The?source?did?not?signal?an?event?for?1?seconds?and?has?been?terminated.4操作符:連接操作符

通過連接操作符,將多個(gè)被觀察數(shù)據(jù)(數(shù)據(jù)源)連接在一起。

1、startWith

可作用于Flowable、Observable。將指定數(shù)據(jù)源合并在另外數(shù)據(jù)源的開頭。

Observable?names?=?Observable.just("Spock",?"McCoy");
Observable?otherNames?=?Observable.just("Git",?"Code","8");
names.startWith(otherNames).subscribe(item?->?Log.d(TAG,item));
//打印:RxJava:?GitRxJava:?CodeRxJava:?8RxJava:?SpockRxJava:?McCo

2、merge

可作用所有數(shù)據(jù)源類型,用于合并多個(gè)數(shù)據(jù)源到一個(gè)數(shù)據(jù)源。

Observable<String>?names?=?Observable.just("Hello",?"world");
Observable<String>?otherNames?=?Observable.just("Git",?"Code","8");

Observable.merge(names,otherNames).subscribe(name?->?Log.d(TAG,name));

//也可以是
//names.mergeWith(otherNames).subscribe(name?->?Log.d(TAG,name));

//打印:
RxJava:?Hello
RxJava:?world
RxJava:?Git
RxJava:?Code
RxJava:?8

merge在合并數(shù)據(jù)源時(shí),如果一個(gè)合并發(fā)生異常后會(huì)立即調(diào)用觀察者的onError方法,并停止合并。可通過mergeDelayError操作符,將發(fā)生的異常留到最后處理。

Observable<String>?names?=?Observable.just("Hello",?"world");?
Observable<String>?otherNames?=?Observable.just("Git",?"Code","8");
Observable<String>?error?=?Observable.error(????
????????????????????????????new?NullPointerException("Error!"));
Observable.mergeDelayError(names,error,otherNames).subscribe(
????name?->?Log.d(TAG,name),?e->Log.d(TAG,e.getMessage()));

//打印:
RxJava:?Hello
RxJava:?world
RxJava:?Git
RxJava:?Code
RxJava:?8
RxJava:?Error!

3、zip

可作用于Flowable、Observable、Maybe、Single。將多個(gè)數(shù)據(jù)源的數(shù)據(jù)一個(gè)一個(gè)的合并在一起哇。當(dāng)其中一個(gè)數(shù)據(jù)源發(fā)射完事件之后,若其他數(shù)據(jù)源還有數(shù)據(jù)未發(fā)射完畢,也會(huì)停止。

Observable?names?=?Observable.just("Hello",?"world");
Observable?otherNames?=?Observable.just("Git",?"Code",?"8");
names.zipWith(otherNames,?(first,?last)?->?first?+?"-"?+?last)
???????.subscribe(item?->?Log.d(TAG,?item));//打印:
RxJava:?Hello-Git
RxJava:?world-Code

4、combineLatest

可作用于Flowable, Observable。在結(jié)合不同數(shù)據(jù)源時(shí),發(fā)射速度快的數(shù)據(jù)源最新item與較慢的相結(jié)合。如下時(shí)間線,Observable-1發(fā)射速率快,發(fā)射了65,Observable-2才發(fā)射了C, 那么兩者結(jié)合就是C5。

5、switchOnNext

一個(gè)發(fā)射多個(gè)小數(shù)據(jù)源的數(shù)據(jù)源,這些小數(shù)據(jù)源發(fā)射數(shù)據(jù)的時(shí)間發(fā)生重復(fù)時(shí),取最新的數(shù)據(jù)源。

5操作符:?變換操作符

變化數(shù)據(jù)源的數(shù)據(jù),并轉(zhuǎn)化為新的數(shù)據(jù)源。

1、buffer

作用于Flowable、Observable。指將數(shù)據(jù)源拆解含有長(zhǎng)度為n的list的多個(gè)數(shù)據(jù)源,不夠n的成為一個(gè)數(shù)據(jù)源。

Observable.range(0,?10)
????.buffer(4)
????.subscribe((List?buffer)?->?System.out.println(buffer));//?打印://?[0,?1,?2,?3]//?[4,?5,?6,?7]//?[8,?9]

2、cast

作用于Flowable、Observable、Maybe、Single。將數(shù)據(jù)元素轉(zhuǎn)型成其他類型,轉(zhuǎn)型失敗會(huì)拋出異常。

Observable?numbers?=?Observable.just(1,?4.0,?3f,?7,?12,?4.6,?5);
numbers.filter((Number?x)?->?Integer.class.isInstance(x))
????.cast(Integer.class)
????.subscribe((Integer?x)?->?System.out.println(x));//?prints://?1//?7//?12//?5

3、concatMap

作用于Flowable、Observable、Maybe。將數(shù)據(jù)源的元素作用于指定函數(shù)后,將函數(shù)的返回值有序的存在新的數(shù)據(jù)源。

Observable.range(0,?5)
????.concatMap(i?->?{
????????long?delay?=?Math.round(Math.random()?*?2);

????????return?Observable.timer(delay,?TimeUnit.SECONDS).map(n?->?i);
????})
????.blockingSubscribe(System.out::print);

//?prints?01234

4、concatMapDelayError

與concatMap作用相同,只是將過程發(fā)送的所有錯(cuò)誤延遲到最后處理。

Observable.intervalRange(1,?3,?0,?1,?TimeUnit.SECONDS)
????.concatMapDelayError(x?->?{
????????if?(x.equals(1L))?return?Observable.error(new?IOException("Something?went?wrong!"));
????????else?return?Observable.just(x,?x?*?x);
????})
????.blockingSubscribe(
????????x?->?System.out.println("onNext:?"?+?x),
????????error?->?System.out.println("onError:?"?+?error.getMessage()));

//?prints:
//?onNext:?2
//?onNext:?4
//?onNext:?3
//?onNext:?9
//?onError:?Something?went?wrong!

5、concatMapCompletable

作用于Flowable、Observable。與contactMap類似,不過應(yīng)用于函數(shù)后,返回的是CompletableSource。訂閱一次并在所有CompletableSource對(duì)象完成時(shí)返回一個(gè)Completable對(duì)象。

Observable?source?=?Observable.just(2,?1,?3);
Completable?completable?=?source.concatMapCompletable(x?->?{return?Completable.timer(x,?TimeUnit.SECONDS)
????????.doOnComplete(()?->?System.out.println("Info:?Processing?of?item?\""?+?x?+?"\"?completed"));
????});
completable.doOnComplete(()?->?System.out.println("Info:?Processing?of?all?items?completed"))
????.blockingAwait();//?prints://?Info:?Processing?of?item?"2"?completed//?Info:?Processing?of?item?"1"?completed//?Info:?Processing?of?item?"3"?completed//?Info:?Processing?of?all?items?completed

6、concatMapCompletableDelayError

與concatMapCompletable作用相同,只是將過程發(fā)送的所有錯(cuò)誤延遲到最后處理。

Observable?source?=?Observable.just(2,?1,?3);
Completable?completable?=?source.concatMapCompletableDelayError(x?->?{if?(x.equals(2))?{return?Completable.error(new?IOException("Processing?of?item?\""?+?x?+?"\"?failed!"));
????}?else?{return?Completable.timer(1,?TimeUnit.SECONDS)
????????????.doOnComplete(()?->?System.out.println("Info:?Processing?of?item?\""?+?x?+?"\"?completed"));
????}
});
completable.doOnError(error?->?System.out.println("Error:?"?+?error.getMessage()))
????.onErrorComplete()
????.blockingAwait();//?prints://?Info:?Processing?of?item?"1"?completed//?Info:?Processing?of?item?"3"?completed//?Error:?Processing?of?item?"2"?failed!

7、flatMap

作用于Flowable、Observable、Maybe、Single。與contactMap類似,只是contactMap的數(shù)據(jù)發(fā)射是有序的,而flatMap是無序的。

Observable.just("A",?"B",?"C")
????.flatMap(a?->?{
????????return?Observable.intervalRange(1,?3,?0,?1,?TimeUnit.SECONDS)
????????????????.map(b?->?'('?+?a?+?",?"?+?b?+?')');
????})
????.blockingSubscribe(System.out::println);

//?prints?(not?necessarily?in?this?order):
//?(A,?1)
//?(C,?1)
//?(B,?1)
//?(A,?2)
//?(C,?2)
//?(B,?2)
//?(A,?3)
//?(C,?3)
//?(B,?3)

8、flattenAsFlowable & flattenAsObservable

作用于Maybe、Single,將其轉(zhuǎn)化為Flowable,或Observable。

Single<Double>?source?=?Single.just(2.0);
Flowable<Double>?flowable?=?source.flattenAsFlowable(x?->?{
????return?List.of(x,?Math.pow(x,?2),?Math.pow(x,?3));
});

flowable.subscribe(x?->?System.out.println("onNext:?"?+?x));

//?prints:
//?onNext:?2.0
//?onNext:?4.0
//?onNext:?8.0

9、groupBy

作用于Flowable、Observable。根據(jù)一定的規(guī)則對(duì)數(shù)據(jù)源進(jìn)行分組。

Observable?animals?=?Observable.just("Tiger",?"Elephant",?"Cat",?"Chameleon",?"Frog",?"Fish",?"Turtle",?"Flamingo");
animals.groupBy(animal?->?animal.charAt(0),?String::toUpperCase)
????.concatMapSingle(Observable::toList)
????.subscribe(System.out::println);//?prints://?[TIGER,?TURTLE]//?[ELEPHANT]//?[CAT,?CHAMELEON]//?[FROG,?FISH,?FLAMINGO]

10、scan

作用于Flowable、Observable。對(duì)數(shù)據(jù)進(jìn)行相關(guān)聯(lián)操作,例如聚合等。

Observable.just(5,?3,?8,?1,?7)
????.scan(0,?(partialSum,?x)?->?partialSum?+?x)
????.subscribe(System.out::println);

//?prints:
//?0
//?5
//?8
//?16
//?17
//?24

11、window

對(duì)數(shù)據(jù)源發(fā)射出來的數(shù)據(jù)進(jìn)行收集,按照指定的數(shù)量進(jìn)行分組,以組的形式重新發(fā)射。

Observable.range(1,?4)
????//?Create?windows?containing?at?most?2?items,?and?skip?3?items?before?starting?a?new?window.
????.window(2)
????.flatMapSingle(window?->?{
????????return?window.map(String::valueOf)
????????????????.reduce(new?StringJoiner(",?",?"[",?"]"),?StringJoiner::add);
????})
????.subscribe(System.out::println);

//?prints:
//?[1,?2]
//?[3,?4]
6操作符:?錯(cuò)誤處理操作符

1、onErrorReturn

作用于Flowable、Observable、Maybe、Single。但調(diào)用數(shù)據(jù)源的onError函數(shù)后會(huì)回到該函數(shù),可對(duì)錯(cuò)誤進(jìn)行處理,然后返回值,會(huì)調(diào)用觀察者onNext()繼續(xù)執(zhí)行,執(zhí)行完調(diào)用onComplete()函數(shù)結(jié)束所有事件的發(fā)射。

Single.just("2A")
????.map(v?->?Integer.parseInt(v,?10))
????.onErrorReturn(error?->?{
????????if?(error?instanceof?NumberFormatException)?return?0;
????????else?throw?new?IllegalArgumentException();
????})
????.subscribe(
????????System.out::println,
????????error?->?System.err.println("onError?should?not?be?printed!"));

//?prints?0

2、onErrorReturnItem

與onErrorReturn類似,onErrorReturnItem不對(duì)錯(cuò)誤進(jìn)行處理,直接返回一個(gè)值。

Single.just("2A")
????.map(v?->?Integer.parseInt(v,?10))
????.onErrorReturnItem(0)
????.subscribe(
????????System.out::println,
????????error?->?System.err.println("onError?should?not?be?printed!"));

//?prints?0

3、onExceptionResumeNext

可作用于Flowable、Observable、Maybe。onErrorReturn發(fā)生異常時(shí),回調(diào)onComplete()函數(shù)后不再往下執(zhí)行,而onExceptionResumeNext則是要在處理異常的時(shí)候返回一個(gè)數(shù)據(jù)源,然后繼續(xù)執(zhí)行,如果返回null,則調(diào)用觀察者的onError()函數(shù)。

Observable.create((ObservableOnSubscribe)?e?->?{
????????????e.onNext(1);
????????????e.onNext(2);
????????????e.onNext(3);
????????????e.onError(new?NullPointerException());
????????????e.onNext(4);
????????})
????????????????.onErrorResumeNext(throwable?->?{
????????????????????Log.d(TAG,?"onErrorResumeNext?");return?Observable.just(4);
????????????????})
????????????????.subscribe(new?Observer()?{@Overridepublic?void?onSubscribe(Disposable?d)?{
????????????????????????Log.d(TAG,?"onSubscribe?");
????????????????????}@Overridepublic?void?onNext(Integer?integer)?{
????????????????????????Log.d(TAG,?"onNext?"?+?integer);
????????????????????}@Overridepublic?void?onError(Throwable?e)?{
????????????????????????Log.d(TAG,?"onError?");
????????????????????}@Overridepublic?void?onComplete()?{
????????????????????????Log.d(TAG,?"onComplete?");
????????????????????}
????????????????});

結(jié)果:

onExceptionResumeNext操作符也是類似的,只是捕獲Exception。

4、retry

可作用于所有的數(shù)據(jù)源,當(dāng)發(fā)生錯(cuò)誤時(shí),數(shù)據(jù)源重復(fù)發(fā)射item,直到?jīng)]有異常或者達(dá)到所指定的次數(shù)。

boolean?first=true;

Observable.create((ObservableOnSubscribe)?e?->?{
????????????e.onNext(1);
????????????e.onNext(2);if?(first){
????????????????first=false;
????????????????e.onError(new?NullPointerException());
????????????}
????????})
????????????????.retry(9)
????????????????.subscribe(new?Observer()?{@Overridepublic?void?onSubscribe(Disposable?d)?{
????????????????????????Log.d(TAG,?"onSubscribe?");
????????????????????}@Overridepublic?void?onNext(Integer?integer)?{
????????????????????????Log.d(TAG,?"onNext?"?+?integer);
????????????????????}@Overridepublic?void?onError(Throwable?e)?{
????????????????????????Log.d(TAG,?"onError?");
????????????????????}@Overridepublic?void?onComplete()?{
????????????????????????Log.d(TAG,?"onComplete?");
????????????????????}
????????????????});

結(jié)果:

5、retryUntil

作用于Flowable、Observable、Maybe。與retry類似,但發(fā)生異常時(shí),返回值是false表示繼續(xù)執(zhí)行(重復(fù)發(fā)射數(shù)據(jù)),true不再執(zhí)行,但會(huì)調(diào)用onError方法。

?Observable.create((ObservableOnSubscribe)?e?->?{
????????????e.onNext(1);
????????????e.onNext(2);
????????????e.onError(new?NullPointerException());
????????????e.onNext(3);
????????????e.onComplete();
????????})
????????????????.retryUntil(()?->?true)
????????????????.subscribe(new?Observer()?{@Overridepublic?void?onSubscribe(Disposable?d)?{
????????????????????????Log.d(TAG,?"onSubscribe?");
????????????????????}@Overridepublic?void?onNext(Integer?integer)?{
????????????????????????Log.d(TAG,?"onNext?"?+?integer);
????????????????????}@Overridepublic?void?onError(Throwable?e)?{
????????????????????????Log.d(TAG,?"onError?");
????????????????????}@Overridepublic?void?onComplete()?{
????????????????????????Log.d(TAG,?"onComplete?");
????????????????????}
????????????????});

結(jié)果:

retryWhen與此類似,但其判斷標(biāo)準(zhǔn)不是BooleanSupplier對(duì)象的getAsBoolean()函數(shù)的返回值。而是返回的 Observable或Flowable是否會(huì)發(fā)射異常事件。

總結(jié)

太多操作符太累了,看得心好累。還是根據(jù)實(shí)際開發(fā)需要查閱文檔才是正確的姿勢(shì)。

本文只是RxJava冰山一角,更多請(qǐng)參閱官網(wǎng)。

參閱官網(wǎng)

https://github.com/ReactiveX/RxJava

好東西要分享

https://github.com/Android-XXM/XXM-BLOG/blob/master/README.md

如果你看到了這,點(diǎn)個(gè)贊,收下我雙膝。如果文章有誤,幫忙指正,謝謝大佬們。

最后推薦一下我做的網(wǎng)站,玩Android:?wanandroid.com?,包含詳盡的知識(shí)體系、好用的工具,還有本公眾號(hào)文章合集,歡迎體驗(yàn)和收藏!

推薦閱讀:

直面底層:“吹上天”的協(xié)程,帶你深入源碼分析關(guān)于Android 抓包 與 反抓包直面底層:你真的了解 View.post() 原理嗎?

掃一掃?關(guān)注我的公眾號(hào)

如果你想要跟大家分享你的文章,歡迎投稿~

┏(^0^)┛明天見!

總結(jié)

以上是生活随笔為你收集整理的retrofit content-length为0_大佬们,一波RxJava 3.0来袭,请做好准备~的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。