日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

生产调度java程序原码_Rxjava的线程调度源码解析

發布時間:2025/3/12 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 生产调度java程序原码_Rxjava的线程调度源码解析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

代碼調用

Observable.just(1)

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

}

});

直接進入主題,先看subscribe中調用了哪些方法

//Observable.java

public final Disposable subscribe(Consumer super T> onNext) {

return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());

}

public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError,

Action onComplete, Consumer super Disposable> onSubscribe) {

ObjectHelper.requireNonNull(onNext, "onNext is null");

ObjectHelper.requireNonNull(onError, "onError is null");

ObjectHelper.requireNonNull(onComplete, "onComplete is null");

ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

LambdaObserver ls = new LambdaObserver(onNext, onError, onComplete, onSubscribe);

subscribe(ls);

return ls;

}

public final void subscribe(Observer super T> observer) {

ObjectHelper.requireNonNull(observer, "observer is null");

try {

observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

subscribeActual(observer);

} catch (NullPointerException e) { // NOPMD

throw e;

} catch (Throwable e) {

Exceptions.throwIfFatal(e);

// can't call onError because no way to know if a Disposable has been set or not

// can't call onSubscribe because the call might have set a Subscription already

RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");

npe.initCause(e);

throw npe;

}

}

//最終調用了Observable的subscribeActual方法

protected abstract void subscribeActual(Observer super T> observer);

接下來我們看下subscribeOn方法中進行了什么操作

//Observable.java

public final Observable subscribeOn(Scheduler scheduler) {

ObjectHelper.requireNonNull(scheduler, "scheduler is null");

//這里返回了一個ObservableSubscribeOn對象,參考RxJavaPlugins.onAssembly方法,

//返回的值就是傳入的值,再根據流式調用,

//即上面分析調用的subscribeActual方法,即是ObservableSubscribeOn的subscribeActual方法

return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));

}

接下來我們看ObservableSubscribeOn的subscribeActual方法

//ObservableSubscribeOn.java

public void subscribeActual(final Observer super T> observer) {

final SubscribeOnObserver parent = new SubscribeOnObserver(observer);

observer.onSubscribe(parent);

//這里生成了一個SubscribeTask,查看源碼可知實現了Runnable接口

//這里調用了scheduler.scheduleDirect

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

}

看下scheduler.scheduleDirect,再次之前,我們先看下傳入的Scheduler.io

查看傳入的Schedule

public static Scheduler io() {

// 這里查看下IO

return RxJavaPlugins.onIoScheduler(IO);

}

//new IOTask

IO = RxJavaPlugins.initIoScheduler(new IOTask());

static final class IOTask implements Callable {

@Override

public Scheduler call() throws Exception {

return IoHolder.DEFAULT;

}

//由此可見,最后Schedulers.io就是IoScheduler

static final class IoHolder {

static final Scheduler DEFAULT = new IoScheduler();

}

//scheduler

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {

//這里生成一個Worker,但是createWorker是一個虛方法,有上可知

//這里調用了IoScheduler.createWorker,生成EventLoopWorker對象

final Worker w = createWorker();

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

DisposeTask task = new DisposeTask(decoratedRun, w);

//調用了EventLoopWorker.schedule

w.schedule(task, delay, unit);

return task;

}

接下來看EventLoopWorker

public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {

//取消注冊

if (tasks.isDisposed()) {

// don't schedule, we are unsubscribed

return EmptyDisposable.INSTANCE;

}

//NewThreadWorker.scheduleActual

return threadWorker.scheduleActual(action, delayTime, unit, tasks);

}

真正進入線程調度的代碼,在NewThreadWorker中

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {

Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

if (parent != null) {

if (!parent.add(sr)) {

return sr;

}

}

Future> f;

try {

if (delayTime <= 0) {

//executor是一個線程池

f = executor.submit((Callable)sr);

} else {

//存在延遲的

f = executor.schedule((Callable)sr, delayTime, unit);

}

sr.setFuture(f);

} catch (RejectedExecutionException ex) {

if (parent != null) {

parent.remove(sr);

}

RxJavaPlugins.onError(ex);

}

return sr;

}

所以到最后,真正進行線程調度的,其實是一個線程池,看完了subscribeOn,我們再來看看observeOn,首先看下AndroidSchedulers.mainThread()到底是哪個線程

public static Scheduler mainThread() {

return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);

}

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(

new Callable() {

@Override public Scheduler call() throws Exception {

return MainHolder.DEFAULT;

}

});

private static final class MainHolder {

static final Scheduler DEFAULT

//從Looper.getMainLooper()可以看出,這里是獲取了主線程的Looper

= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);

}

好確定了這個問題,我們再繼續往下看

public final Observable observeOn(Scheduler scheduler) {

return observeOn(scheduler, false, bufferSize());

}

public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {

ObjectHelper.requireNonNull(scheduler, "scheduler is null");

ObjectHelper.verifyPositive(bufferSize, "bufferSize");

//生成一個新的ObservableObserverOn對象

return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));

}

接下去看ObservableObserveOn對象

protected void subscribeActual(Observer super T> observer) {

if (scheduler instanceof TrampolineScheduler) {

source.subscribe(observer);

} else {

//跟之前一樣還是調用createWorker,從上面代碼可知調用了HandlerScheduler.createWorker返回HandlerWorker

Scheduler.Worker w = scheduler.createWorker();

//這里有一個內部類對象ObserveOnObserver

source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));

}

}

//內部類ObserveOnObserver,以下是回調方法

public void onSubscribe(Disposable d) {

if (DisposableHelper.validate(this.upstream, d)) {

this.upstream = d;

if (d instanceof QueueDisposable) {

@SuppressWarnings("unchecked")

QueueDisposable qd = (QueueDisposable) d;

int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

if (m == QueueDisposable.SYNC) {

sourceMode = m;

queue = qd;

done = true;

downstream.onSubscribe(this);

//調用schedule

schedule();

return;

}

if (m == QueueDisposable.ASYNC) {

sourceMode = m;

queue = qd;

downstream.onSubscribe(this);

return;

}

}

queue = new SpscLinkedArrayQueue(bufferSize);

downstream.onSubscribe(this);

}

}

@Override

public void onNext(T t) {

if (done) {

return;

}

if (sourceMode != QueueDisposable.ASYNC) {

queue.offer(t);

}

//調用schedule

schedule();

}

@Override

public void onError(Throwable t) {

if (done) {

RxJavaPlugins.onError(t);

return;

}

error = t;

done = true;

//調用schedule

schedule();

}

@Override

public void onComplete() {

if (done) {

return;

}

done = true;

//調用schedule

schedule();

}

void schedule() {

if (getAndIncrement() == 0) {

//所以當回調的時候,最終是調用了worker.schedule

worker.schedule(this);

}

}

//最終看一下HandlerWorker的schedule方法,一看源碼,老朋友了,Handler就不解釋了

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {

if (run == null) throw new NullPointerException("run == null");

if (unit == null) throw new NullPointerException("unit == null");

if (disposed) {

return Disposables.disposed();

}

run = RxJavaPlugins.onSchedule(run);

ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

Message message = Message.obtain(handler, scheduled);

message.obj = this; // Used as token for batch disposal of this worker's runnables.

if (async) {

message.setAsynchronous(true);

}

handler.sendMessageDelayed(message, unit.toMillis(delay));

// Re-check disposed state for removing in case we were racing a call to dispose().

if (disposed) {

handler.removeCallbacks(scheduled);

return Disposables.disposed();

}

return scheduled;

}

總結

以上是生活随笔為你收集整理的生产调度java程序原码_Rxjava的线程调度源码解析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。