日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > Android >内容正文

Android

RxJava 和 RxAndroid 五(线程调度)

發布時間:2024/9/30 Android 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RxJava 和 RxAndroid 五(线程调度) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
對rxJava不了解的同學可以先看

RxJava 和 RxAndroid 一 (基礎)
RxJava 和 RxAndroid 二(操作符的使用)
RxJava 和 RxAndroid 三(生命周期控制和內存優化)

RxJava 和 RxAndroid 四(RxBinding的使用)

?

本文將有幾個例子說明,rxjava線程調度的正確使用姿勢。

例1

Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {Logger.v( "rx_call" , Thread.currentThread().getName() );subscriber.onNext( "dd");subscriber.onCompleted();}}).map(new Func1<String, String >() {@Overridepublic String call(String s) {Logger.v( "rx_map" , Thread.currentThread().getName() );return s + "88";}}).subscribe(new Action1<String>() {@Overridepublic void call(String s) {Logger.v( "rx_subscribe" , Thread.currentThread().getName() );}}) ;

  結果

/rx_call: main ? ? ? ? ? -- 主線程
/rx_map: main ? ? ? ?-- ?主線程
/rx_subscribe: main ? -- 主線程

例2

new Thread(new Runnable() {@Overridepublic void run() {Logger.v( "rx_newThread" , Thread.currentThread().getName() );rx();}}).start();void rx(){Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {Logger.v( "rx_call" , Thread.currentThread().getName() );subscriber.onNext( "dd");subscriber.onCompleted();}}).map(new Func1<String, String >() {@Overridepublic String call(String s) {Logger.v( "rx_map" , Thread.currentThread().getName() );return s + "88";}}).subscribe(new Action1<String>() {@Overridepublic void call(String s) {Logger.v( "rx_subscribe" , Thread.currentThread().getName() );}}) ;}

 

? ? ? 結果

/rx_newThread: Thread-564 ? -- 子線程
/rx_call: Thread-564 ? ? ? ? ? ? ?-- 子線程
/rx_map: Thread-564 ? ? ? ? ? ?-- 子線程?
/rx_subscribe: Thread-564 ? ?-- 子線程

?

  • 通過例1和例2,說明,Rxjava默認運行在當前線程中。如果當前線程是子線程,則rxjava運行在子線程;同樣,當前線程是主線程,則rxjava運行在主線程

?

例3

Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {Logger.v( "rx_call" , Thread.currentThread().getName() );subscriber.onNext( "dd");subscriber.onCompleted();}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).map(new Func1<String, String >() {@Overridepublic String call(String s) {Logger.v( "rx_map" , Thread.currentThread().getName() );return s + "88";}}).subscribe(new Action1<String>() {@Overridepublic void call(String s) {Logger.v( "rx_subscribe" , Thread.currentThread().getName() );}}) ;

  結果

/rx_call: RxCachedThreadScheduler-1 ? ?--io線程
/rx_map: main ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??--主線程
/rx_subscribe: main ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?--主線程

?

例4

Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {Logger.v( "rx_call" , Thread.currentThread().getName() );subscriber.onNext( "dd");subscriber.onCompleted();}}).map(new Func1<String, String >() {@Overridepublic String call(String s) {Logger.v( "rx_map" , Thread.currentThread().getName() );return s + "88";}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<String>() {@Overridepublic void call(String s) {Logger.v( "rx_subscribe" , Thread.currentThread().getName() );}}) ; 

? ? ? 結果

/rx_call: RxCachedThreadScheduler-1 ? ? --io線程
/rx_map: RxCachedThreadScheduler-1 ? --io線程
/rx_subscribe: main ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?--主線程

? ?

  • 通過例3、例4 可以看出??.subscribeOn(Schedulers.io()) ?和?.observeOn(AndroidSchedulers.mainThread()) 寫的位置不一樣,造成的結果也不一樣。從例4中可以看出 map() 操作符默認運行在事件產生的線程之中。事件消費只是在 subscribe() 里面。
  • 對于 create() , just() , from() ? 等 ? ? ? ? ? ? ? ? --- 事件產生 ??

? ? ? ? ? ? ? ?map() , flapMap() , scan() , filter() ?等 ? ?-- ?事件加工

? ? ? ? ? ? ? subscribe() ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?-- ?事件消費

  • ? 事件產生:默認運行在當前線程,可以由 subscribeOn() ?自定義線程

? ? ? ? ?事件加工:默認跟事件產生的線程保持一致, 可以由 observeOn() 自定義線程

? ? ? ?事件消費:默認運行在當前線程,可以有observeOn() 自定義

?

例5 ?多次切換線程

?

Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {Logger.v( "rx_call" , Thread.currentThread().getName() );subscriber.onNext( "dd");subscriber.onCompleted();}}).observeOn( Schedulers.newThread() ) //新線程.map(new Func1<String, String >() {@Overridepublic String call(String s) {Logger.v( "rx_map" , Thread.currentThread().getName() );return s + "88";}}).observeOn( Schedulers.io() ) //io線程.filter(new Func1<String, Boolean>() {@Overridepublic Boolean call(String s) {Logger.v( "rx_filter" , Thread.currentThread().getName() );return s != null ;}}).subscribeOn(Schedulers.io()) //定義事件產生線程:io線程.observeOn(AndroidSchedulers.mainThread()) //事件消費線程:主線程.subscribe(new Action1<String>() {@Overridepublic void call(String s) {Logger.v( "rx_subscribe" , Thread.currentThread().getName() );}}) ;

  結果

/rx_call: RxCachedThreadScheduler-1 ? ? ? ? ? -- io 線程
/rx_map: RxNewThreadScheduler-1 ? ? ? ? ? ??-- new出來的線程
/rx_filter: RxCachedThreadScheduler-2 ? ? ? ?-- io線程
/rx_subscribe: main ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? -- 主線程

?

例6:只規定了事件產生的線程

Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {Log.v( "rx--create " , Thread.currentThread().getName() ) ;subscriber.onNext( "dd" ) ;}}).subscribeOn(Schedulers.io()).subscribe(new Action1<String>() {@Overridepublic void call(String s) {Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;}}) ;

  結果

/rx--create: RxCachedThreadScheduler-4 ? ? ? ? ? ? ? ? ? ? ?// io 線程
/rx--subscribe: RxCachedThreadScheduler-4 ? ? ? ? ? ? ? ? // io 線程

? ? ?

例:7:只規定事件消費線程

Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {Log.v( "rx--create " , Thread.currentThread().getName() ) ;subscriber.onNext( "dd" ) ;}}).observeOn( Schedulers.newThread() ).subscribe(new Action1<String>() {@Overridepublic void call(String s) {Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;}}) ;

  結果

/rx--create: main ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? -- 主線程
/rx--subscribe: RxNewThreadScheduler-1 ? ? ? ?-- ?new 出來的子線程?

? ? ??

? ? 從例6可以看出,如果只規定了事件產生的線程,那么事件消費線程將跟隨事件產生線程。

? ? 從例7可以看出,如果只規定了事件消費的線程,那么事件產生的線程和 當前線程保持一致。

?

例8:線程調度封裝

?在Android 常常有這樣的場景,后臺處理處理數據,前臺展示數據。

一般的用法:

Observable.just( "123" ).subscribeOn( Schedulers.io()).observeOn( AndroidSchedulers.mainThread() ).subscribe(new Action1() {@Overridepublic void call(Object o) {}}) ;

  但是項目中這種場景有很多,所以我們就想能不能把這種場景的調度方式封裝起來,方便調用。

簡單的封裝

public Observable apply( Observable observable ){return observable.subscribeOn( Schedulers.io() ).observeOn( AndroidSchedulers.mainThread() ) ;}

使用

apply( Observable.just( "123" ) ).subscribe(new Action1() {@Overridepublic void call(Object o) {}}) ;

弊端:雖然上面的這種封裝可以做到線程調度的目的,但是它破壞了鏈式編程的結構,是編程風格變得不優雅。

改進:Transformers 的使用(就是轉化器的意思,把一種類型的Observable轉換成另一種類型的Observable?)

改進后的封裝

Observable.Transformer schedulersTransformer = new Observable.Transformer() {@Override public Object call(Object observable) {return ((Observable) observable).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread());}};

  使用

Observable.just( "123" ).compose( schedulersTransformer ).subscribe(new Action1() {@Overridepublic void call(Object o) {}}) ;

  弊端:雖然保持了鏈式編程結構的完整,但是每次調用?.compose( schedulersTransformer ) 都是 new 了一個對象的。所以我們需要再次封裝,盡量保證單例的模式。

改進后的封裝

package lib.app.com.myapplication;import rx.Observable; import rx.android.schedulers.AndroidSchedulers; import rx.schedulers.Schedulers;/*** Created by ${zyj} on 2016/7/1.*/ public class RxUtil {private final static Observable.Transformer schedulersTransformer = new Observable.Transformer() {@Override public Object call(Object observable) {return ((Observable) observable).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread());}};public static <T> Observable.Transformer<T, T> applySchedulers() {return (Observable.Transformer<T, T>) schedulersTransformer;}}

  使用

Observable.just( "123" ).compose( RxUtil.<String>applySchedulers() ).subscribe(new Action1() {@Overridepublic void call(Object o) {}}) ;

  

?

?

 

?

總結

以上是生活随笔為你收集整理的RxJava 和 RxAndroid 五(线程调度)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。