RxJava 2.x 入门
之前只大概了解RxJava,并沒在實際的項目中實戰過,但最近在研究訊飛語音的一個demo的時候發現,他們都在使用mvvm,dagger2,rxjava2.x, 姿態很優雅,很吸引人,心想,臥槽再不嘗試一下就落后了,于是決定在項目中采用這些優秀的框架,與時俱進。在這里記錄梳理一下Rxjava2.x 的知識。
RxJava的優點就不多說了,直接接入正題。
1.添加依賴
compile 'io.reactivex.rxjava2:rxjava:2.1.1'2.Rxjava 原理
??RxJava 以觀察者模式為骨架
?Rxjava2.x 中有兩種觀察者模式:
- Observable ( 被觀察者 ) / Observer ( 觀察者 )
-
Flowable (被觀察者)/ Subscriber (觀察者)
3.基本使用
1).使用步驟
? ? ? 一:初始化 Observable?
? ? ? 二:初始化 Observer?
? ? ? 三:建立訂閱關系?
2).create
? ? ?可用于獲取一個別觀察者的對象。
??
Observable.create(new ObservableOnSubscribe<Integer>() { // 第一步:初始化Observable@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onNext(4);e.onComplete();}}).subscribe(new Observer<Integer>() { // 第三步:訂閱// 第二步:初始化Observer@Overridepublic void onSubscribe(@NonNull Disposable d) { }@Overridepublic void onNext(@NonNull Integer integer) {Log.e(TAG, "onNext : value : " + integer + "\n" );}@Overridepublic void onError(@NonNull Throwable e) {Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );}@Overridepublic void onComplete() {Log.e(TAG, "onComplete" + "\n" );}});??Observable 通過在subcribe方法中調用e.onNext(1),? 在訂閱被觀察者之后,可以在訂閱者的onNext(Integer integer) 方法中獲取對應的值。
?
3).subScribeOn 與 observeOn
subscribeOn?用于指定?subscribe()?時所發生的線程
observeOn?方法用于指定下游?Observer(被觀察者)?回調發生的線程。
- 簡單地說,subscribeOn() 指定的就是發射事件的線程,observerOn 指定的就是訂閱者接收事件的線程。
- 多次指定發射事件的線程只有第一次指定的有效,也就是說多次調用 subscribeOn() 只有第一次的有效,其余的會被忽略。
- 但多次指定訂閱者接收線程是可以的,也就是說每調用一次 observerOn(),下游的線程就會切換一次。
實例代碼中,分別用 Schedulers.newThread() 和 Schedulers.io() 對發射線程進行切換,并采用 observeOn(AndroidSchedulers.mainThread() 和 Schedulers.io() 進行了接收線程的切換。可以看到輸出中發射線程僅僅響應了第一個 newThread,但每調用一次 observeOn() ,線程便會切換一次。
RxJava 中,已經內置了很多線程選項供我們選擇,例如有:
- Schedulers.io() 代表io操作的線程, 通常用于網絡,讀寫文件等io密集型的操作;
- Schedulers.computation() 代表CPU計算密集型的操作, 例如需要大量計算的操作;
- Schedulers.newThread() 代表一個常規的新線程;
- AndroidSchedulers.mainThread() 代表Android的主線程
?
4.操作符
4.1?map
map 操作符的作用是對上游大發送的每一個事件的Observables 通過一個函數,使得每一個事件都按照指定的函數去變化。
Observable.create(new ObservableOnSubscribe<Response>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {Builder builder = new Builder().url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512").get();Request request = builder.build();Call call = new OkHttpClient().newCall(request);Response response = call.execute();e.onNext(response);}}).map(new Function<Response, MobileAddress>() {@Overridepublic MobileAddress apply(@NonNull Response response) throws Exception {if (response.isSuccessful()) {ResponseBody body = response.body();if (body != null) {Log.e(TAG, "map:轉換前:" + response.body());return new Gson().fromJson(body.string(), MobileAddress.class);}}return null;}}).observeOn(AndroidSchedulers.mainThread()).doOnNext(new Consumer<MobileAddress>() {@Overridepublic void accept(@NonNull MobileAddress s) throws Exception {Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n");}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<MobileAddress>() {@Overridepublic void accept(@NonNull MobileAddress data) throws Exception {Log.e(TAG, "成功:" + data.toString() + "\n");}, new Consumer<Throwable>() {@Overridepublic void accept(@NonNull Throwable throwable) throws Exception {Log.e(TAG, "失敗:" + throwable.getMessage() + "\n");}});Observable 請求的數據發送后,經過map操作符,轉換成Gson解析后的bean對象,然后再傳到Observer中的accept 中。簡而言之,map的作用就是轉換數據。
4.2 concat 操作符
?
concat 連接操作符,其作用就是對數據的連接。可接受Observable的可變參數,或者Observable的集合。
?
Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6)).subscribe(new Consumer<Integer>() {@Overridepublic void accept(@NonNull Integer integer) throws Exception {mRxOperatorsText.append("concat : "+ integer + "\n");Log.e(TAG, "concat : "+ integer + "\n" );}});輸出結果是:
concat 1 concat 2 concat 3 concat 4 concat 5 concat 64.3flatMap?
?
Observable?通過某種方法轉換為多個?Observables,然后再把這些分散的?Observables裝進一個單一的發射器?Observable.
應用場景:實現多個網絡請求依次依賴。
Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list").addQueryParameter("rows", 1 + "").build().getObjectObservable(FoodList.class) // 發起獲取食品列表的請求,并解析到FootList.subscribeOn(Schedulers.io()) // 在io線程進行網絡請求.observeOn(AndroidSchedulers.mainThread()) // 在主線程處理獲取食品列表的請求結果.doOnNext(new Consumer<FoodList>() {@Overridepublic void accept(@NonNull FoodList foodList) throws Exception {// 先根據獲取食品列表的響應結果做一些操作Log.e(TAG, "accept: doOnNext :" + foodList.toString());mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n");}}).observeOn(Schedulers.io()) // 回到 io 線程去處理獲取食品詳情的請求.flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() {@Overridepublic ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception {if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) {return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show").addBodyParameter("id", foodList.getTngou().get(0).getId() + "").build().getObjectObservable(FoodDetail.class);}return null;}}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<FoodDetail>() {@Overridepublic void accept(@NonNull FoodDetail foodDetail) throws Exception {Log.e(TAG, "accept: success :" + foodDetail.toString());mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n");}}, new Consumer<Throwable>() {@Overridepublic void accept(@NonNull Throwable throwable) throws Exception {Log.e(TAG, "accept: error :" + throwable.getMessage());mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n");}});4.4zip?
zip?操作符可以將多個?Observable?的數據結合為一個數據源再發射出去。
Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512").build().getObjectObservable(MobileAddress.class);Observable<CategoryResult> observable2 = Network.getGankApi().getCategoryData("Android",1,1);Observable.zip(observable1, observable2, new BiFunction<MobileAddress, CategoryResult, String>() {@Overridepublic String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception {return "合并后的數據為:手機歸屬地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.results.get(0).who;}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {@Overridepublic void accept(@NonNull String s) throws Exception {Log.e(TAG, "accept: 成功:" + s+"\n");}}, new Consumer<Throwable>() {@Overridepublic void accept(@NonNull Throwable throwable) throws Exception {Log.e(TAG, "accept: 失敗:" + throwable+"\n");}});4.5interval?
Observable.interval(3, 2, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()) // 由于interval默認在新線程,所以我們應該切回主線程.subscribe(new Consumer<Long>() {@Overridepublic void accept(@NonNull Long aLong) throws Exception {mRxOperatorsText.append("interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");Log.e(TAG, "interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");}});間隔執行操作,默認在新線程。
?
4.6 concatMap
與flatmap 一樣,區別在于,其保證了事件的順序。
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);}}).concatMap(new Function<Integer, ObservableSource<String>>() {@Overridepublic ObservableSource<String> apply(@NonNull Integer integer) throws Exception {List<String> list = new ArrayList<>();for (int i = 0; i < 3; i++) {list.add("I am value " + integer);}int delayTime = (int) (1 + Math.random() * 10);return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);}}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {@Overridepublic void accept(@NonNull String s) throws Exception {Log.e(TAG, "concatMap : accept : " + s + "\n");mRxOperatorsText.append("concatMap : accept : " + s + "\n");}});實例中的輸出按照,發射的順序來執行。
?
4.7? doOnNext
讓訂閱者在接收到數據前干點事情?
Observable.just(1, 2, 3, 4).doOnNext(new Consumer<Integer>() {@Overridepublic void accept(@NonNull Integer integer) throws Exception {mRxOperatorsText.append("doOnNext 保存 " + integer + "成功" + "\n");Log.e(TAG, "doOnNext 保存 " + integer + "成功" + "\n");}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(@NonNull Integer integer) throws Exception {mRxOperatorsText.append("doOnNext :" + integer + "\n");Log.e(TAG, "doOnNext :" + integer + "\n");}});4.8 filter
?過濾操作符,取正確的值。
Observable.just(1, 20, 65, -5, 7, 19).filter(new Predicate<Integer>() {@Overridepublic boolean test(@NonNull Integer integer) throws Exception {return integer >= 10;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(@NonNull Integer integer) throws Exception {mRxOperatorsText.append("filter : " + integer + "\n");Log.e(TAG, "filter : " + integer + "\n");}});4.9 skip
接收一個long型的參數,表示跳過多少個數目的事件再開始接收。
Observable.just(1,2,3,4,5).skip(2).subscribe(new Consumer<Integer>() {@Overridepublic void accept(@NonNull Integer integer) throws Exception {mRxOperatorsText.append("skip : "+integer + "\n");Log.e(TAG, "skip : "+integer + "\n");}});示例中輸出的結果為:345。
4.10? take?
用于指定訂閱者最多接收到多少數據。
Flowable.fromArray(1,2,3,4,5).take(2).subscribe(new Consumer<Integer>() {@Overridepublic void accept(@NonNull Integer integer) throws Exception {mRxOperatorsText.append("take : "+integer + "\n");Log.e(TAG, "accept: take : "+integer + "\n" );}});示例中輸出的結果為:12。
4.11 timer
可以延遲執行一段邏輯,也可以間隔執行一段邏輯。
Observable.timer(2, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Long>() {@Overridepublic void accept(@NonNull Long aLong) throws Exception {mRxOperatorsText.append("timer :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");Log.e(TAG, "timer :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");}});4.12 just
接收一個可變參數,一次發送。
Observable.just("1", "2", "3").subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {@Overridepublic void accept(@NonNull String s) throws Exception {mRxOperatorsText.append("accept : onNext : " + s + "\n");Log.e(TAG,"accept : onNext : " + s + "\n" );}});4.13 single
single只會接受一個參數而snigleObserver 只會調用onError或者onSuccess
Single.just("haha").subscribe(new SingleObserver<String>() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onSuccess(String s) {Log.e("成功:"+s);}@Overridepublic void onError(Throwable e) {}});4.14 distinct
去重操作符
Observable.just(1, 1, 1, 2, 2, 3, 4, 5).distinct().subscribe(new Consumer<Integer>() {@Overridepublic void accept(@NonNull Integer integer) throws Exception {mRxOperatorsText.append("distinct : " + integer + "\n");Log.e(TAG, "distinct : " + integer + "\n");}});4.15 buffer
將observable中的數據按skip(步長)分成最長不超過count的buffer,然后生成一個observable。
Observable.just(1, 2, 3, 4, 5).buffer(3, 2).subscribe(new Consumer<List<Integer>>() {@Overridepublic void accept(@NonNull List<Integer> integers) throws Exception {mRxOperatorsText.append("buffer size : " + integers.size() + "\n");Log.e(TAG, "buffer size : " + integers.size() + "\n");mRxOperatorsText.append("buffer value : ");Log.e(TAG, "buffer value : " );for (Integer i : integers) {mRxOperatorsText.append(i + "");Log.e(TAG, i + "");}mRxOperatorsText.append("\n");Log.e(TAG, "\n");}});先取3個,每2個再取3個
4.16 debounce
過濾掉發射頻率過快的數據項。
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {// send events with simulated time waitemitter.onNext(1); // skipThread.sleep(400);emitter.onNext(2); // deliverThread.sleep(505);emitter.onNext(3); // skipThread.sleep(100);emitter.onNext(4); // deliverThread.sleep(605);emitter.onNext(5); // deliverThread.sleep(510);emitter.onComplete();}}).debounce(500, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {@Overridepublic void accept(@NonNull Integer integer) throws Exception {mRxOperatorsText.append("debounce :" + integer + "\n");Log.e(TAG,"debounce :" + integer + "\n");}});4.17 defer
就是在每次訂閱的時候就會創建一個新的Observable
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {@Overridepublic ObservableSource<Integer> call() throws Exception {return Observable.just(1, 2, 3);}});observable.subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(@NonNull Integer integer) {mRxOperatorsText.append("defer : " + integer + "\n");Log.e(TAG, "defer : " + integer + "\n");}@Overridepublic void onError(@NonNull Throwable e) {mRxOperatorsText.append("defer : onError : " + e.getMessage() + "\n");Log.e(TAG, "defer : onError : " + e.getMessage() + "\n");}@Overridepublic void onComplete() {mRxOperatorsText.append("defer : onComplete\n");Log.e(TAG, "defer : onComplete\n");}});4.18 last?
取出最后一個值,參數沒有值的時候的默認值。
Observable.just(1, 2, 3).last(4).subscribe(new Consumer<Integer>() {@Overridepublic void accept(@NonNull Integer integer) throws Exception {mRxOperatorsText.append("last : " + integer + "\n");Log.e(TAG, "last : " + integer + "\n");}});4.19? merge
將多個Observable合起來,接收可變參數,也支持使用迭代器集合。
Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5)).subscribe(new Consumer<Integer>() {@Overridepublic void accept(@NonNull Integer integer) throws Exception {mRxOperatorsText.append("merge :" + integer + "\n");Log.e(TAG, "accept: merge :" + integer + "\n" );}});4.20 reduce
就是一次用一個方法處理一個值,可以有一個seed作為初始值。
Observable.just(1, 2, 3).reduce(new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {return integer + integer2;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(@NonNull Integer integer) throws Exception {mRxOperatorsText.append("reduce : " + integer + "\n");Log.e(TAG, "accept: reduce : " + integer + "\n");}});示例輸出結果為: 6;
?
4.21? scan?
和reduce差不多,scan會將過程中每一個結果輸出。
Observable.just(1, 2, 3).scan(new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {return integer + integer2;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(@NonNull Integer integer) throws Exception {mRxOperatorsText.append("scan " + integer + "\n");Log.e(TAG, "accept: scan " + integer + "\n");}});示例輸出:1 3 6
?
4.22 window
按照時間劃分窗口,將數據發送給不同的Observable。window操作符會在時間間隔內緩存結果
Observable.interval(1, TimeUnit.SECONDS) // 間隔一秒發一次.take(15) // 最多接收15個.window(3, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Observable<Long>>() {@Overridepublic void accept(@NonNull Observable<Long> longObservable) throws Exception {mRxOperatorsText.append("Sub Divide begin...\n");Log.e(TAG, "Sub Divide begin...\n");longObservable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Long>() {@Overridepublic void accept(@NonNull Long aLong) throws Exception {mRxOperatorsText.append("Next:" + aLong + "\n");Log.e(TAG, "Next:" + aLong + "\n");}});}});4.23 PublishSubject??
onNext() 會通知每個觀察者
PublishSubject<Integer> publishSubject = PublishSubject.create();publishSubject.subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {mRxOperatorsText.append("First onSubscribe :"+d.isDisposed()+"\n");Log.e(TAG, "First onSubscribe :"+d.isDisposed()+"\n");}@Overridepublic void onNext(@NonNull Integer integer) {mRxOperatorsText.append("First onNext value :"+integer + "\n");Log.e(TAG, "First onNext value :"+integer + "\n");}@Overridepublic void onError(@NonNull Throwable e) {mRxOperatorsText.append("First onError:"+e.getMessage()+"\n");Log.e(TAG, "First onError:"+e.getMessage()+"\n" );}@Overridepublic void onComplete() {mRxOperatorsText.append("First onComplete!\n");Log.e(TAG, "First onComplete!\n");}});publishSubject.onNext(1);publishSubject.onNext(2);publishSubject.onNext(3);publishSubject.subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {mRxOperatorsText.append("Second onSubscribe :"+d.isDisposed()+"\n");Log.e(TAG, "Second onSubscribe :"+d.isDisposed()+"\n");}@Overridepublic void onNext(@NonNull Integer integer) {mRxOperatorsText.append("Second onNext value :"+integer + "\n");Log.e(TAG, "Second onNext value :"+integer + "\n");}@Overridepublic void onError(@NonNull Throwable e) {mRxOperatorsText.append("Second onError:"+e.getMessage()+"\n");Log.e(TAG, "Second onError:"+e.getMessage()+"\n" );}@Overridepublic void onComplete() {mRxOperatorsText.append("Second onComplete!\n");Log.e(TAG, "Second onComplete!\n");}});publishSubject.onNext(4);publishSubject.onNext(5);publishSubject.onComplete();?
總結
以上是生活随笔為你收集整理的RxJava 2.x 入门的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Android Studio:创建类时,
- 下一篇: Android Architecture