日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Reactor 3快速上手——响应式Spring的道法术器

發布時間:2024/1/18 javascript 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Reactor 3快速上手——响应式Spring的道法术器 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

在1.3.2節簡單介紹了不同類型的調度器Scheduler,以及如何使用publishOn和subscribeOn切換不同的線程執行環境。

下邊使用一個簡單的例子再回憶一下:

@Testpublic void testScheduling() {Flux.range(0, 10) // .log() // 1.publishOn(Schedulers.newParallel("myParallel")) // .log() // 2.subscribeOn(Schedulers.newElastic("myElastic")).log() // 3.blockLast();}
  • 只保留這個log()的話,可以看到,源頭數據流是執行在myElastic-x線程上的;
  • 只保留這個log()的話,可以看到,publishOn之后數據流是執行在myParallel-x線程上的;
  • 只保留這個log()的話,可以看到,subscribeOn之后數據流依然是執行在myParallel-x線程上的。
  • 通過以上三個log()的輸出,可以發現,對于如下圖所示的操作鏈:

    • publishOn會影響鏈中其后的操作符,比如第一個publishOn調整調度器為elastic,則filter的處理操作是在彈性線程池中執行的;同理,flatMap是執行在固定大小的parallel線程池中的;
    • subscribeOn無論出現在什么位置,都只影響源頭的執行環境,也就是range方法是執行在單線程中的,直至被第一個publishOn切換調度器之前,所以range后的map也在單線程中執行。

    這一節我們了解一下它的實現機制。

    2.4.1 調度器

    調度器相當于Reactor中的ExecutorService,不同的調度器定義不同的線程執行環境。Schedulers工具類提供的靜態方法可搭建不同的線程執行環境。

    Schedulers類已經預先創建了幾種常用的不同線程池模型的調度器:使用single()、elastic()和parallel()方法創建的調度器可以分別使用內置的單線程、彈性線程池和固定大小線程池。如果想創建新的調度器,可以使用newSingle()、newElastic()和newParallel()方法。這些方法都是返回一個Scheduler的具體實現。

    看一下Scheduler都有哪些行為:

    public interface Scheduler extends Disposable {// 調度執行Runnable任務task。Disposable schedule(Runnable task);// 延遲一段指定的時間后執行。Disposable schedule(Runnable task, long delay, TimeUnit unit);// 周期性地執行任務。Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);// 創建一個工作線程。Worker createWorker();// 啟動調度器void start();// 以下兩個方法可以暫時忽略void dispose();long now(TimeUnit unit)// 一個Worker代表調度器可調度的一個工作線程,在一個Worker內,遵循FIFO(先進先出)的任務執行策略interface Worker extends Disposable {// 調度執行Runnable任務task。Disposable schedule(Runnable task);// 延遲一段指定的時間后執行。Disposable schedule(Runnable task, long delay, TimeUnit unit);// 周期性地執行任務。Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);} }

    如圖所示,Scheduler是領導,Worker是員工,每個Scheduler手中有若干Worker。接到任務后,Scheduler負責分派,Worker負責干活。

    在Scheduler中,每個Worker都是一個ScheduledExecutorService,或一個包裝了ScheduledExecutorService的對象。所以,Scheduler擁有的并不是線程池,而是一個自行維護的ScheduledExecutorService池。

    所謂“自行維護”,主要有三點:

  • 可供調遣的Worker。比如Schedulers.newParallel()返回的ParallelScheduler,其內維護的是一個固定大小的ScheduledExecutorService[]數組;而ElasticScheduler由一個ExecutorService的Queue來維護。
  • 任務分派策略。ElasticScheduler和ParallelScheduler都有一個pick()方法,用來選出合適的Worker。
  • 對于要處理的任務,包裝為Callable,從而可以異步地返回一個Future給調用者。
  • 2.4.2 切換執行環境

    再回到publishOn和subscribeOn方法。

    在Reactor中,對于數據流的處理,實際上是一系列方法調用和基于事件的回調,包括subscribe、onSubscribe、request,以及onNext、onError、onComplete。拿出2.1節的圖幫助理解:

    當調用.subscribe()方法時,會形成從上游向下游的數據流,數據流中的元素通過onNext* (onError|onComplete)攜帶“順流而下”。同時,Reactor使用者看不到的是,還有一條從下游向上游的“訂閱鏈”,request就是沿著這個鏈向上反饋需求的。

    publishOn方法能夠將onNext、onError、onComplete調度到給定的Scheduler的Worker上執行。所以如上圖場景中,再.map和.filter中間增加一個publisheOn(Schedulers.elastic())的話,.filter操作的onNext的過濾處理將會執行在ElasticScheduler的某個Worker上。

    subscribeOn方法能夠將subscribe(會調用onSubscribe)、request調度到給定的Scheduler的Worker上執行。所以在任何位置增加一個subscribeOn(Schedulers.elastic())的話,都會借助自下而上的訂閱鏈,通過subscribe()方法,將線程執行環境傳遞到“源頭”,從而Flux.just會執行在ElasticScheduler上。繼而影響到其后的操作符,直至遇到publishOn改變了執行環境。

    此外,有些操作符本身會需要調度器來進行多線程的處理,當你不明確指定調度器的時候,那些操作符會自行使用內置的單例調度器來執行。例如,Flux.delayElements(Duration)?使用的是?Schedulers.parallel()調度器對象:

    @Testpublic void testDelayElements() {Flux.range(0, 10).delayElements(Duration.ofMillis(10)).log().blockLast();}

    從輸出可以看到onNext運行在不同的線程上:

    [ INFO] (main) onSubscribe(FluxConcatMap.ConcatMapImmediate) [ INFO] (main) request(unbounded) [ INFO] (parallel-1) onNext(0) [ INFO] (parallel-2) onNext(1) [ INFO] (parallel-3) onNext(2) [ INFO] (parallel-4) onNext(3) ...

    2.4.3 為數據流配置Context

    在Reactor中,基于Scheduler的線程調度確實非常簡單好用,但是還有個問題需要解決。

    我們以往在編寫多線程的代碼時,如果涉及到只在線程內部使用的值,可能會使用ThreadLocal進行包裝。

    但是在響應式編程中,由于線程環境經常發生變化,這一用法就失去作用了,并且甚至帶來bug。比如,使用 Logback 的 MDC 來存儲日志關聯的 ID 就屬于這種情況。

    自從版本 3.1.0,Reactor 引入了一個類似于 ThreadLocal 的高級功能:Context。它作用于一個 Flux 或一個 Mono 上,而不是應用于一個線程(Thread)。也就是其生命周期伴隨整個數據流,而不是線程。

    相對來說,用戶使用Context并不多,對此感興趣或有此需求的話,請看我翻譯的相關文檔,可以對Reactor內部實現尤其是Subscription有更深的理解。

    2.4.4 并行執行

    如今多核架構已然普及,能夠方便的進行并行處理是很重要的。

    對于一些能夠在一個線程中順序處理的任務,即使調度到ParallelScheduler上,通常也只由一個Worker來執行,比如:

    @Testpublic void testParallelFlux() throws InterruptedException {Flux.range(1, 10).publishOn(Schedulers.parallel()).log().subscribe();TimeUnit.MILLISECONDS.sleep(10);}

    輸出如下:

    [ INFO] (main) | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber) [ INFO] (main) | request(unbounded) [ INFO] (parallel-1) | onNext(1) [ INFO] (parallel-1) | onNext(2) [ INFO] (parallel-1) | onNext(3) [ INFO] (parallel-1) | onNext(4) [ INFO] (parallel-1) | onNext(5) [ INFO] (parallel-1) | onNext(6) [ INFO] (parallel-1) | onNext(7) [ INFO] (parallel-1) | onNext(8) [ INFO] (parallel-1) | onNext(9) [ INFO] (parallel-1) | onNext(10) [ INFO] (parallel-1) | onComplete()

    有時候,我們確實需要一些任務能夠“均勻”分布在不同的工作線程上執行,這時候就需要用到ParallelFlux。

    你可以對任何Flux使用parallel()操作符來得到一個ParallelFlux。不過這個操作符本身并不會進行并行處理,而只是將負載劃分到多個執行“軌道”上(默認情況下,軌道個數與CPU核數相等)。

    為了配置ParallelFlux如何并行地執行每一個軌道,需要使用runOn(Scheduler),這里,Schedulers.parallel() 是比較推薦的專門用于并行處理的調度器。

    @Testpublic void testParallelFlux() throws InterruptedException {Flux.range(1, 10).parallel(2).runOn(Schedulers.parallel()) // .publishOn(Schedulers.parallel()).log().subscribe();TimeUnit.MILLISECONDS.sleep(10);}

    輸出如下:

    [ INFO] (main) onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber) [ INFO] (main) request(unbounded) [ INFO] (main) onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber) [ INFO] (main) request(unbounded) [ INFO] (parallel-1) onNext(1) [ INFO] (parallel-2) onNext(2) [ INFO] (parallel-1) onNext(3) [ INFO] (parallel-2) onNext(4) [ INFO] (parallel-1) onNext(5) [ INFO] (parallel-2) onNext(6) [ INFO] (parallel-1) onNext(7) [ INFO] (parallel-2) onNext(8) [ INFO] (parallel-1) onNext(9) [ INFO] (parallel-2) onNext(10) [ INFO] (parallel-1) onComplete() [ INFO] (parallel-2) onComplete()

    可以看到,各個元素的onNext “均勻”分布執行在兩個線程上,最后每個線程上有獨立的onComplete事件,這與publishOn調度到ParallelScheduler上的情況是不同的。

    ?

    https://blog.51cto.com/liukang/2090163

    總結

    以上是生活随笔為你收集整理的Reactor 3快速上手——响应式Spring的道法术器的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。