日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

使用Reactor进行反应式编程最全教程

發(fā)布時間:2024/9/20 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用Reactor进行反应式编程最全教程 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

反應(yīng)式編程(Reactive Programming)這種新的編程范式越來越受到開發(fā)人員的歡迎。在 Java 社區(qū)中比較流行的是 RxJava 和 RxJava 2。本文要介紹的是另外一個新的反應(yīng)式編程庫 Reactor。

反應(yīng)式編程介紹

反應(yīng)式編程來源于數(shù)據(jù)流和變化的傳播,意味著由底層的執(zhí)行模型負(fù)責(zé)通過數(shù)據(jù)流來自動傳播變化。比如求值一個簡單的表達(dá)式 c=a+b,當(dāng) a 或者 b 的值發(fā)生變化時,傳統(tǒng)的編程范式需要對 a+b 進(jìn)行重新計算來得到 c 的值。如果使用反應(yīng)式編程,當(dāng) a 或者 b 的值發(fā)生變化時,c 的值會自動更新。反應(yīng)式編程最早由 .NET 平臺上的 Reactive Extensions (Rx) 庫來實現(xiàn)。后來遷移到 Java 平臺之后就產(chǎn)生了著名的 RxJava 庫,并產(chǎn)生了很多其他編程語言上的對應(yīng)實現(xiàn)。在這些實現(xiàn)的基礎(chǔ)上產(chǎn)生了后來的反應(yīng)式流(Reactive Streams)規(guī)范。該規(guī)范定義了反應(yīng)式流的相關(guān)接口,并將集成到 Java 9 中。

在傳統(tǒng)的編程范式中,我們一般通過迭代器(Iterator)模式來遍歷一個序列。這種遍歷方式是由調(diào)用者來控制節(jié)奏的,采用的是拉的方式。每次由調(diào)用者通過 next()方法來獲取序列中的下一個值。使用反應(yīng)式流時采用的則是推的方式,即常見的發(fā)布者-訂閱者模式。當(dāng)發(fā)布者有新的數(shù)據(jù)產(chǎn)生時,這些數(shù)據(jù)會被推送到訂閱者來進(jìn)行處理。在反應(yīng)式流上可以添加各種不同的操作來對數(shù)據(jù)進(jìn)行處理,形成數(shù)據(jù)處理鏈。這個以聲明式的方式添加的處理鏈只在訂閱者進(jìn)行訂閱操作時才會真正執(zhí)行。

反應(yīng)式流中第一個重要概念是負(fù)壓(backpressure)。在基本的消息推送模式中,當(dāng)消息發(fā)布者產(chǎn)生數(shù)據(jù)的速度過快時,會使得消息訂閱者的處理速度無法跟上產(chǎn)生的速度,從而給訂閱者造成很大的壓力。當(dāng)壓力過大時,有可能造成訂閱者本身的奔潰,所產(chǎn)生的級聯(lián)效應(yīng)甚至可能造成整個系統(tǒng)的癱瘓。負(fù)壓的作用在于提供一種從訂閱者到生產(chǎn)者的反饋渠道。訂閱者可以通過 request()方法來聲明其一次所能處理的消息數(shù)量,而生產(chǎn)者就只會產(chǎn)生相應(yīng)數(shù)量的消息,直到下一次 request()方法調(diào)用。這實際上變成了推拉結(jié)合的模式。

Reactor 簡介

前面提到的 RxJava 庫是 JVM 上反應(yīng)式編程的先驅(qū),也是反應(yīng)式流規(guī)范的基礎(chǔ)。RxJava 2 在 RxJava 的基礎(chǔ)上做了很多的更新。不過 RxJava 庫也有其不足的地方。RxJava 產(chǎn)生于反應(yīng)式流規(guī)范之前,雖然可以和反應(yīng)式流的接口進(jìn)行轉(zhuǎn)換,但是由于底層實現(xiàn)的原因,使用起來并不是很直觀。RxJava 2 在設(shè)計和實現(xiàn)時考慮到了與規(guī)范的整合,不過為了保持與 RxJava 的兼容性,很多地方在使用時也并不直觀。Reactor 則是完全基于反應(yīng)式流規(guī)范設(shè)計和實現(xiàn)的庫,沒有 RxJava 那樣的歷史包袱,在使用上更加的直觀易懂。Reactor 也是 Spring 5 中反應(yīng)式編程的基礎(chǔ)。學(xué)習(xí)和掌握 Reactor 可以更好地理解 Spring 5 中的相關(guān)概念。

在 Java 程序中使用 Reactor 庫非常的簡單,只需要通過 Maven 或 Gradle 來添加對 io.projectreactor:reactor-core 的依賴即可,目前的版本是 3.0.5.RELEASE。

Flux 和 Mono

Flux 和 Mono 是 Reactor 中的兩個基本概念。Flux 表示的是包含 0 到 N 個元素的異步序列。在該序列中可以包含三種不同類型的消息通知:正常的包含元素的消息、序列結(jié)束的消息和序列出錯的消息。當(dāng)消息通知產(chǎn)生時,訂閱者中對應(yīng)的方法 onNext(), onComplete()和 onError()會被調(diào)用。Mono 表示的是包含 0 或者 1 個元素的異步序列。該序列中同樣可以包含與 Flux 相同的三種類型的消息通知。Flux 和 Mono 之間可以進(jìn)行轉(zhuǎn)換。對一個 Flux 序列進(jìn)行計數(shù)操作,得到的結(jié)果是一個 Mono<Long>對象。把兩個 Mono 序列合并在一起,得到的是一個 Flux 對象。

創(chuàng)建 Flux

有多種不同的方式可以創(chuàng)建 Flux 序列。

Flux 類的靜態(tài)方法

第一種方式是通過 Flux 類中的靜態(tài)方法。

  • just():可以指定序列中包含的全部元素。創(chuàng)建出來的 Flux 序列在發(fā)布這些元素之后會自動結(jié)束。
  • fromArray(),fromIterable()和 fromStream():可以從一個數(shù)組、Iterable 對象或 Stream 對象中創(chuàng)建 Flux 對象。
  • empty():創(chuàng)建一個不包含任何元素,只發(fā)布結(jié)束消息的序列。
  • error(Throwable error):創(chuàng)建一個只包含錯誤消息的序列。
  • never():創(chuàng)建一個不包含任何消息通知的序列。
  • range(int start, int count):創(chuàng)建包含從 start 起始的 count 個數(shù)量的 Integer 對象的序列。
  • interval(Duration period)和 interval(Duration delay, Duration period):創(chuàng)建一個包含了從 0 開始遞增的 Long 對象的序列。其中包含的元素按照指定的間隔來發(fā)布。除了間隔時間之外,還可以指定起始元素發(fā)布之前的延遲時間。
  • intervalMillis(long period)和 intervalMillis(long delay, long period):與 interval()方法的作用相同,只不過該方法通過毫秒數(shù)來指定時間間隔和延遲時間。

代碼清單 1 中給出了上述這些方法的使用示例。

清單 1. 通過 Flux 類的靜態(tài)方法創(chuàng)建 Flux 序列

1

2

3

4

5

6

Flux.just("Hello", "World").subscribe(System.out::println);

Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);

Flux.empty().subscribe(System.out::println);

Flux.range(1, 10).subscribe(System.out::println);

Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);

Flux.intervalMillis(1000).subscribe(System.out::println);

上面的這些靜態(tài)方法適合于簡單的序列生成,當(dāng)序列的生成需要復(fù)雜的邏輯時,則應(yīng)該使用 generate() 或 create() 方法。

generate()方法

generate()方法通過同步和逐一的方式來產(chǎn)生 Flux 序列。序列的產(chǎn)生是通過調(diào)用所提供的 SynchronousSink 對象的 next(),complete()和 error(Throwable)方法來完成的。逐一生成的含義是在具體的生成邏輯中,next()方法只能最多被調(diào)用一次。在有些情況下,序列的生成可能是有狀態(tài)的,需要用到某些狀態(tài)對象。此時可以使用 generate()方法的另外一種形式 generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator),其中 stateSupplier 用來提供初始的狀態(tài)對象。在進(jìn)行序列生成時,狀態(tài)對象會作為 generator 使用的第一個參數(shù)傳入,可以在對應(yīng)的邏輯中對該狀態(tài)對象進(jìn)行修改以供下一次生成時使用。

在代碼清單 2中,第一個序列的生成邏輯中通過 next()方法產(chǎn)生一個簡單的值,然后通過 complete()方法來結(jié)束該序列。如果不調(diào)用 complete()方法,所產(chǎn)生的是一個無限序列。第二個序列的生成邏輯中的狀態(tài)對象是一個 ArrayList 對象。實際產(chǎn)生的值是一個隨機數(shù)。產(chǎn)生的隨機數(shù)被添加到 ArrayList 中。當(dāng)產(chǎn)生了 10 個數(shù)時,通過 complete()方法來結(jié)束序列。

清單 2. 使用 generate()方法生成 Flux 序列

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

Flux.generate(sink -> {

????sink.next("Hello");

????sink.complete();

}).subscribe(System.out::println);

final Random random = new Random();

Flux.generate(ArrayList::new, (list, sink) -> {

????int value = random.nextInt(100);

????list.add(value);

????sink.next(value);

????if (list.size() == 10) {

????????sink.complete();

????}

????return list;

}).subscribe(System.out::println);

create()方法

create()方法與 generate()方法的不同之處在于所使用的是 FluxSink 對象。FluxSink 支持同步和異步的消息產(chǎn)生,并且可以在一次調(diào)用中產(chǎn)生多個元素。在代碼清單 3 中,在一次調(diào)用中就產(chǎn)生了全部的 10 個元素。

清單 3. 使用 create()方法生成 Flux 序列

1

2

3

4

5

6

Flux.create(sink -> {

????for (int i = 0; i < 10; i++) {

????????sink.next(i);

????}

????sink.complete();

}).subscribe(System.out::println);

創(chuàng)建 Mono

Mono 的創(chuàng)建方式與之前介紹的 Flux 比較相似。Mono 類中也包含了一些與 Flux 類中相同的靜態(tài)方法。這些方法包括 just(),empty(),error()和 never()等。除了這些方法之外,Mono 還有一些獨有的靜態(tài)方法。

  • fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分別從 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中創(chuàng)建 Mono。
  • delay(Duration duration)和 delayMillis(long duration):創(chuàng)建一個 Mono 序列,在指定的延遲時間之后,產(chǎn)生數(shù)字 0 作為唯一值。
  • ignoreElements(Publisher<T> source):創(chuàng)建一個 Mono 序列,忽略作為源的 Publisher 中的所有元素,只產(chǎn)生結(jié)束消息。
  • justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):從一個 Optional 對象或可能為 null 的對象中創(chuàng)建 Mono。只有 Optional 對象中包含值或?qū)ο蟛粸?null 時,Mono 序列才產(chǎn)生對應(yīng)的元素。

還可以通過 create()方法來使用 MonoSink 來創(chuàng)建 Mono。代碼清單 4 中給出了創(chuàng)建 Mono 序列的示例。

清單 4. 創(chuàng)建 Mono 序列

1

2

3

Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);

Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);

Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);

操作符

和 RxJava 一樣,Reactor 的強大之處在于可以在反應(yīng)式流上通過聲明式的方式添加多種不同的操作符。下面對其中重要的操作符進(jìn)行分類介紹。

buffer 和 bufferTimeout

這兩個操作符的作用是把當(dāng)前流中的元素收集到集合中,并把集合對象作為流中的新元素。在進(jìn)行收集時可以指定不同的條件:所包含的元素的最大數(shù)量或收集的時間間隔。方法 buffer()僅使用一個條件,而 bufferTimeout()可以同時指定兩個條件。指定時間間隔時可以使用 Duration 對象或毫秒數(shù),即使用 bufferMillis()或 bufferTimeoutMillis()兩個方法。

除了元素數(shù)量和時間間隔之外,還可以通過 bufferUntil 和 bufferWhile 操作符來進(jìn)行收集。這兩個操作符的參數(shù)是表示每個集合中的元素所要滿足的條件的 Predicate 對象。bufferUntil 會一直收集直到 Predicate 返回為 true。使得 Predicate 返回 true 的那個元素可以選擇添加到當(dāng)前集合或下一個集合中;bufferWhile 則只有當(dāng) Predicate 返回 true 時才會收集。一旦值為 false,會立即開始下一次收集。

代碼清單 5 給出了 buffer 相關(guān)操作符的使用示例。第一行語句輸出的是 5 個包含 20 個元素的數(shù)組;第二行語句輸出的是 2 個包含了 10 個元素的數(shù)組;第三行語句輸出的是 5 個包含 2 個元素的數(shù)組。每當(dāng)遇到一個偶數(shù)就會結(jié)束當(dāng)前的收集;第四行語句輸出的是 5 個包含 1 個元素的數(shù)組,數(shù)組里面包含的只有偶數(shù)。

需要注意的是,在代碼清單 5 中,首先通過 toStream()方法把 Flux 序列轉(zhuǎn)換成 Java 8 中的 Stream 對象,再通過 forEach()方法來進(jìn)行輸出。這是因為序列的生成是異步的,而轉(zhuǎn)換成 Stream 對象可以保證主線程在序列生成完成之前不會退出,從而可以正確地輸出序列中的所有元素。

清單 5. buffer 相關(guān)操作符的使用示例

1

2

3

4

Flux.range(1, 100).buffer(20).subscribe(System.out::println);

Flux.intervalMillis(100).bufferMillis(1001).take(2).toStream().forEach(System.out::println);

Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);

Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);

filter

對流中包含的元素進(jìn)行過濾,只留下滿足 Predicate 指定條件的元素。代碼清單 6 中的語句輸出的是 1 到 10 中的所有偶數(shù)。

清單 6. filter 操作符使用示例

1

Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);

window

window 操作符的作用類似于 buffer,所不同的是 window 操作符是把當(dāng)前流中的元素收集到另外的 Flux 序列中,因此返回值類型是 Flux<Flux<T>>。在代碼清單 7 中,兩行語句的輸出結(jié)果分別是 5 個和 2 個 UnicastProcessor 字符。這是因為 window 操作符所產(chǎn)生的流中包含的是 UnicastProcessor 類的對象,而 UnicastProcessor 類的 toString 方法輸出的就是 UnicastProcessor 字符。

清單 7. window 操作符使用示例

1

2

Flux.range(1, 100).window(20).subscribe(System.out::println);

Flux.intervalMillis(100).windowMillis(1001).take(2).toStream().forEach(System.out::println);

zipWith

zipWith 操作符把當(dāng)前流中的元素與另外一個流中的元素按照一對一的方式進(jìn)行合并。在合并時可以不做任何處理,由此得到的是一個元素類型為 Tuple2 的流;也可以通過一個 BiFunction 函數(shù)對合并的元素進(jìn)行處理,所得到的流的元素類型為該函數(shù)的返回值。

在代碼清單 8 中,兩個流中包含的元素分別是 a,b 和 c,d。第一個 zipWith 操作符沒有使用合并函數(shù),因此結(jié)果流中的元素類型為 Tuple2;第二個 zipWith 操作通過合并函數(shù)把元素類型變?yōu)?String。

清單 8. zipWith 操作符使用示例

1

2

3

4

5

6

Flux.just("a", "b")

????????.zipWith(Flux.just("c", "d"))

????????.subscribe(System.out::println);

Flux.just("a", "b")

????????.zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2))

????????.subscribe(System.out::println);

take

take 系列操作符用來從當(dāng)前流中提取元素。提取的方式可以有很多種。

  • take(long n),take(Duration timespan)和 takeMillis(long timespan):按照指定的數(shù)量或時間間隔來提取。
  • takeLast(long n):提取流中的最后 N 個元素。
  • takeUntil(Predicate<? super T> predicate):提取元素直到 Predicate 返回 true。
  • takeWhile(Predicate<? super T> continuePredicate): 當(dāng) Predicate 返回 true 時才進(jìn)行提取。
  • takeUntilOther(Publisher<?> other):提取元素直到另外一個流開始產(chǎn)生元素。

在代碼清單 9 中,第一行語句輸出的是數(shù)字 1 到 10;第二行語句輸出的是數(shù)字 991 到 1000;第三行語句輸出的是數(shù)字 1 到 9;第四行語句輸出的是數(shù)字 1 到 10,使得 Predicate 返回 true 的元素也是包含在內(nèi)的。

清單 9. take 系列操作符使用示例

1

2

3

4

Flux.range(1, 1000).take(10).subscribe(System.out::println);

Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);

Flux.range(1, 1000).takeWhile(i -> i <?10).subscribe(System.out::println);

Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);

reduce 和 reduceWith

reduce 和 reduceWith 操作符對流中包含的所有元素進(jìn)行累積操作,得到一個包含計算結(jié)果的 Mono 序列。累積操作是通過一個 BiFunction 來表示的。在操作時可以指定一個初始值。如果沒有初始值,則序列的第一個元素作為初始值。

在代碼清單 10 中,第一行語句對流中的元素進(jìn)行相加操作,結(jié)果為 5050;第二行語句同樣也是進(jìn)行相加操作,不過通過一個 Supplier 給出了初始值為 100,所以結(jié)果為 5150。

清單 10. reduce 和 reduceWith 操作符使用示例

1

2

Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);

Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);

merge 和 mergeSequential

merge 和 mergeSequential 操作符用來把多個流合并成一個 Flux 序列。不同之處在于 merge 按照所有流中元素的實際產(chǎn)生順序來合并,而 mergeSequential 則按照所有流被訂閱的順序,以流為單位進(jìn)行合并。

代碼清單 11 中分別使用了 merge 和 mergeSequential 操作符。進(jìn)行合并的流都是每隔 100 毫秒產(chǎn)生一個元素,不過第二個流中的每個元素的產(chǎn)生都比第一個流要延遲 50 毫秒。在使用 merge 的結(jié)果流中,來自兩個流的元素是按照時間順序交織在一起;而使用 mergeSequential 的結(jié)果流則是首先產(chǎn)生第一個流中的全部元素,再產(chǎn)生第二個流中的全部元素。

清單 11. merge 和 mergeSequential 操作符使用示例

1

2

3

4

5

6

Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))

????????.toStream()

????????.forEach(System.out::println);

Flux.mergeSequential(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))

????????.toStream()

????????.forEach(System.out::println);

flatMap 和 flatMapSequential

flatMap 和 flatMapSequential 操作符把流中的每個元素轉(zhuǎn)換成一個流,再把所有流中的元素進(jìn)行合并。flatMapSequential 和 flatMap 之間的區(qū)別與 mergeSequential 和 merge 之間的區(qū)別是一樣的。

在代碼清單 12 中,流中的元素被轉(zhuǎn)換成每隔 100 毫秒產(chǎn)生的數(shù)量不同的流,再進(jìn)行合并。由于第一個流中包含的元素數(shù)量較少,所以在結(jié)果流中一開始是兩個流的元素交織在一起,然后就只有第二個流中的元素。

清單 12. flatMap 操作符使用示例

1

2

3

4

Flux.just(5, 10)

????????.flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))

????????.toStream()

????????.forEach(System.out::println);

concatMap

concatMap 操作符的作用也是把流中的每個元素轉(zhuǎn)換成一個流,再把所有流進(jìn)行合并。與 flatMap 不同的是,concatMap 會根據(jù)原始流中的元素順序依次把轉(zhuǎn)換之后的流進(jìn)行合并;與 flatMapSequential 不同的是,concatMap 對轉(zhuǎn)換之后的流的訂閱是動態(tài)進(jìn)行的,而 flatMapSequential 在合并之前就已經(jīng)訂閱了所有的流。

代碼清單 13 與代碼清單 12 類似,只不過把 flatMap 換成了 concatMap,結(jié)果流中依次包含了第一個流和第二個流中的全部元素。

清單 13. concatMap 操作符使用示例

1

2

3

4

Flux.just(5, 10)

????????.concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))

????????.toStream()

????????.forEach(System.out::println);

combineLatest

combineLatest 操作符把所有流中的最新產(chǎn)生的元素合并成一個新的元素,作為返回結(jié)果流中的元素。只要其中任何一個流中產(chǎn)生了新的元素,合并操作就會被執(zhí)行一次,結(jié)果流中就會產(chǎn)生新的元素。在 代碼清單 14 中,流中最新產(chǎn)生的元素會被收集到一個數(shù)組中,通過 Arrays.toString 方法來把數(shù)組轉(zhuǎn)換成 String。

清單 14. combineLatest 操作符使用示例

1

2

3

4

5

Flux.combineLatest(

????????Arrays::toString,

????????Flux.intervalMillis(100).take(5),

????????Flux.intervalMillis(50, 100).take(5)

).toStream().forEach(System.out::println);

消息處理

當(dāng)需要處理 Flux 或 Mono 中的消息時,如之前的代碼清單所示,可以通過 subscribe 方法來添加相應(yīng)的訂閱邏輯。在調(diào)用 subscribe 方法時可以指定需要處理的消息類型。可以只處理其中包含的正常消息,也可以同時處理錯誤消息和完成消息。代碼清單 15 中通過 subscribe()方法同時處理了正常消息和錯誤消息。

清單 15. 通過 subscribe()方法處理正常和錯誤消息

1

2

3

Flux.just(1, 2)

????????.concatWith(Mono.error(new IllegalStateException()))

????????.subscribe(System.out::println, System.err::println);

正常的消息處理相對簡單。當(dāng)出現(xiàn)錯誤時,有多種不同的處理策略。第一種策略是通過 onErrorReturn()方法返回一個默認(rèn)值。在代碼清單 16 中,當(dāng)出現(xiàn)錯誤時,流會產(chǎn)生默認(rèn)值 0.

清單 16. 出現(xiàn)錯誤時返回默認(rèn)值

1

2

3

4

Flux.just(1, 2)

????????.concatWith(Mono.error(new IllegalStateException()))

????????.onErrorReturn(0)

????????.subscribe(System.out::println);

第二種策略是通過 switchOnError()方法來使用另外的流來產(chǎn)生元素。在代碼清單 17 中,當(dāng)出現(xiàn)錯誤時,將產(chǎn)生 Mono.just(0)對應(yīng)的流,也就是數(shù)字 0。

清單 17. 出現(xiàn)錯誤時使用另外的流

1

2

3

4

Flux.just(1, 2)

????????.concatWith(Mono.error(new IllegalStateException()))

????????.switchOnError(Mono.just(0))

????????.subscribe(System.out::println);

第三種策略是通過 onErrorResumeWith()方法來根據(jù)不同的異常類型來選擇要使用的產(chǎn)生元素的流。在代碼清單 18 中,根據(jù)異常類型來返回不同的流作為出現(xiàn)錯誤時的數(shù)據(jù)來源。因為異常的類型為 IllegalArgumentException,所產(chǎn)生的元素為-1。

清單 18. 出現(xiàn)錯誤時根據(jù)異常類型來選擇流

1

2

3

4

5

6

7

8

9

10

11

Flux.just(1, 2)

????????.concatWith(Mono.error(new IllegalArgumentException()))

????????.onErrorResumeWith(e -> {

????????????if (e instanceof IllegalStateException) {

????????????????return Mono.just(0);

????????????} else if (e instanceof IllegalArgumentException) {

????????????????return Mono.just(-1);

????????????}

????????????return Mono.empty();

????????})

????????.subscribe(System.out::println);

當(dāng)出現(xiàn)錯誤時,還可以通過 retry 操作符來進(jìn)行重試。重試的動作是通過重新訂閱序列來實現(xiàn)的。在使用 retry 操作符時可以指定重試的次數(shù)。代碼清單 19 中指定了重試次數(shù)為 1,所輸出的結(jié)果是 1,2,1,2 和錯誤信息。

清單 19. 使用 retry 操作符進(jìn)行重試

1

2

3

4

Flux.just(1, 2)

????????.concatWith(Mono.error(new IllegalStateException()))

????????.retry(1)

????????.subscribe(System.out::println);

調(diào)度器

前面介紹了反應(yīng)式流和在其上可以進(jìn)行的各種操作,通過調(diào)度器(Scheduler)可以指定這些操作執(zhí)行的方式和所在的線程。有下面幾種不同的調(diào)度器實現(xiàn)。

  • 當(dāng)前線程,通過 Schedulers.immediate()方法來創(chuàng)建。
  • 單一的可復(fù)用的線程,通過 Schedulers.single()方法來創(chuàng)建。
  • 使用彈性的線程池,通過 Schedulers.elastic()方法來創(chuàng)建。線程池中的線程是可以復(fù)用的。當(dāng)所需要時,新的線程會被創(chuàng)建。如果一個線程閑置太長時間,則會被銷毀。該調(diào)度器適用于 I/O 操作相關(guān)的流的處理。
  • 使用對并行操作優(yōu)化的線程池,通過 Schedulers.parallel()方法來創(chuàng)建。其中的線程數(shù)量取決于 CPU 的核的數(shù)量。該調(diào)度器適用于計算密集型的流的處理。
  • 使用支持任務(wù)調(diào)度的調(diào)度器,通過 Schedulers.timer()方法來創(chuàng)建。
  • 從已有的 ExecutorService 對象中創(chuàng)建調(diào)度器,通過 Schedulers.fromExecutorService()方法來創(chuàng)建。

某些操作符默認(rèn)就已經(jīng)使用了特定類型的調(diào)度器。比如 intervalMillis()方法創(chuàng)建的流就使用了由 Schedulers.timer()創(chuàng)建的調(diào)度器。通過 publishOn()和 subscribeOn()方法可以切換執(zhí)行操作的調(diào)度器。其中 publishOn()方法切換的是操作符的執(zhí)行方式,而 subscribeOn()方法切換的是產(chǎn)生流中元素時的執(zhí)行方式。

在代碼清單 20 中,使用 create()方法創(chuàng)建一個新的 Flux 對象,其中包含唯一的元素是當(dāng)前線程的名稱。接著是兩對 publishOn()和 map()方法,其作用是先切換執(zhí)行時的調(diào)度器,再把當(dāng)前的線程名稱作為前綴添加。最后通過 subscribeOn()方法來改變流產(chǎn)生時的執(zhí)行方式。運行之后的結(jié)果是[elastic-2] [single-1] parallel-1。最內(nèi)層的線程名字 parallel-1 來自產(chǎn)生流中元素時使用的 Schedulers.parallel()調(diào)度器,中間的線程名稱 single-1 來自第一個 map 操作之前的 Schedulers.single()調(diào)度器,最外層的線程名字 elastic-2 來自第二個 map 操作之前的 Schedulers.elastic()調(diào)度器。

清單 20. 使用調(diào)度器切換操作符執(zhí)行方式

1

2

3

4

5

6

7

8

9

10

11

Flux.create(sink -> {

????sink.next(Thread.currentThread().getName());

????sink.complete();

})

.publishOn(Schedulers.single())

.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))

.publishOn(Schedulers.elastic())

.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))

.subscribeOn(Schedulers.parallel())

.toStream()

.forEach(System.out::println);

測試

在對使用 Reactor 的代碼進(jìn)行測試時,需要用到 io.projectreactor.addons:reactor-test 庫。

使用 StepVerifier

進(jìn)行測試時的一個典型的場景是對于一個序列,驗證其中所包含的元素是否符合預(yù)期。StepVerifier 的作用是可以對序列中包含的元素進(jìn)行逐一驗證。在代碼清單 21 中,需要驗證的流中包含 a 和 b 兩個元素。通過 StepVerifier.create()方法對一個流進(jìn)行包裝之后再進(jìn)行驗證。expectNext()方法用來聲明測試時所期待的流中的下一個元素的值,而 verifyComplete()方法則驗證流是否正常結(jié)束。類似的方法還有 verifyError()來驗證流由于錯誤而終止。

清單 21. 使用 StepVerifier 驗證流中的元素

1

2

3

4

StepVerifier.create(Flux.just("a", "b"))

????????.expectNext("a")

????????.expectNext("b")

????????.verifyComplete();

操作測試時間

有些序列的生成是有時間要求的,比如每隔 1 分鐘才產(chǎn)生一個新的元素。在進(jìn)行測試中,不可能花費實際的時間來等待每個元素的生成。此時需要用到 StepVerifier 提供的虛擬時間功能。通過 StepVerifier.withVirtualTime()方法可以創(chuàng)建出使用虛擬時鐘的 StepVerifier。通過 thenAwait(Duration)方法可以讓虛擬時鐘前進(jìn)。

在代碼清單 22 中,需要驗證的流中包含兩個產(chǎn)生間隔為一天的元素,并且第一個元素的產(chǎn)生延遲是 4 個小時。在通過 StepVerifier.withVirtualTime()方法包裝流之后,expectNoEvent()方法用來驗證在 4 個小時之內(nèi)沒有任何消息產(chǎn)生,然后驗證第一個元素 0 產(chǎn)生;接著 thenAwait()方法來讓虛擬時鐘前進(jìn)一天,然后驗證第二個元素 1 產(chǎn)生;最后驗證流正常結(jié)束。

清單 22. 操作測試時間

1

2

3

4

5

6

7

StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofHours(4), Duration.ofDays(1)).take(2))

????????.expectSubscription()

????????.expectNoEvent(Duration.ofHours(4))

????????.expectNext(0L)

????????.thenAwait(Duration.ofDays(1))

????????.expectNext(1L)

????????.verifyComplete();

使用 TestPublisher

TestPublisher 的作用在于可以控制流中元素的產(chǎn)生,甚至是違反反應(yīng)流規(guī)范的情況。在代碼清單 23 中,通過 create()方法創(chuàng)建一個新的 TestPublisher 對象,然后使用 next()方法來產(chǎn)生元素,使用 complete()方法來結(jié)束流。TestPublisher 主要用來測試開發(fā)人員自己創(chuàng)建的操作符。

清單 23. 使用 TestPublisher 創(chuàng)建測試所用的流

1

2

3

4

5

6

7

8

9

final TestPublisher<String> testPublisher = TestPublisher.create();

testPublisher.next("a");

testPublisher.next("b");

testPublisher.complete();

StepVerifier.create(testPublisher)

????????.expectNext("a")

????????.expectNext("b")

????????.expectComplete();

調(diào)試

由于反應(yīng)式編程范式與傳統(tǒng)編程范式的差異性,使用 Reactor 編寫的代碼在出現(xiàn)問題時比較難進(jìn)行調(diào)試。為了更好的幫助開發(fā)人員進(jìn)行調(diào)試,Reactor 提供了相應(yīng)的輔助功能。

啟用調(diào)試模式

當(dāng)需要獲取更多與流相關(guān)的執(zhí)行信息時,可以在程序開始的地方添加代碼清單 24 中的代碼來啟用調(diào)試模式。在調(diào)試模式啟用之后,所有的操作符在執(zhí)行時都會保存額外的與執(zhí)行鏈相關(guān)的信息。當(dāng)出現(xiàn)錯誤時,這些信息會被作為異常堆棧信息的一部分輸出。通過這些信息可以分析出具體是在哪個操作符的執(zhí)行中出現(xiàn)了問題。

清單 24. 啟用調(diào)試模式

1

Hooks.onOperator(providedHook -> providedHook.operatorStacktrace());

不過當(dāng)調(diào)試模式啟用之后,記錄這些額外的信息是有代價的。一般只有在出現(xiàn)了錯誤之后,再考慮啟用調(diào)試模式。但是當(dāng)為了找到問題而啟用了調(diào)試模式之后,之前的錯誤不一定能很容易重現(xiàn)出來。為了減少可能的開銷,可以限制只對特定類型的操作符啟用調(diào)試模式。

使用檢查點

另外一種做法是通過 checkpoint 操作符來對特定的流處理鏈來啟用調(diào)試模式。代碼清單 25 中,在 map 操作符之后添加了一個名為 test 的檢查點。當(dāng)出現(xiàn)錯誤時,檢查點名稱會出現(xiàn)在異常堆棧信息中。對于程序中重要或者復(fù)雜的流處理鏈,可以在關(guān)鍵的位置上啟用檢查點來幫助定位可能存在的問題。

清單 25. 使用 checkpoint 操作符

1

Flux.just(1, 0).map(x -> 1 / x).checkpoint("test").subscribe(System.out::println);

日志記錄

在開發(fā)和調(diào)試中的另外一項實用功能是把流相關(guān)的事件記錄在日志中。這可以通過添加 log 操作符來實現(xiàn)。在代碼清單 26 中,添加了 log 操作符并指定了日志分類的名稱。

清單 26. 使用 log 操作符記錄事件

1

Flux.range(1, 2).log("Range").subscribe(System.out::println);

在實際的運行時,所產(chǎn)生的輸出如代碼清單 27 所示。

清單 27. log 操作符所產(chǎn)生的日志

1

2

3

4

5

6

7

8

13:07:56.735 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework

13:07:56.751 [main] INFO Range - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)

13:07:56.753 [main] INFO Range - | request(unbounded)

13:07:56.754 [main] INFO Range - | onNext(1)

1

13:07:56.754 [main] INFO Range - | onNext(2)

2

13:07:56.754 [main] INFO Range - | onComplete()

“冷”與“熱”序列

之前的代碼清單中所創(chuàng)建的都是冷序列。冷序列的含義是不論訂閱者在何時訂閱該序列,總是能收到序列中產(chǎn)生的全部消息。而與之對應(yīng)的熱序列,則是在持續(xù)不斷地產(chǎn)生消息,訂閱者只能獲取到在其訂閱之后產(chǎn)生的消息。

在代碼清單 28 中,原始的序列中包含 10 個間隔為 1 秒的元素。通過 publish()方法把一個 Flux 對象轉(zhuǎn)換成 ConnectableFlux 對象。方法 autoConnect()的作用是當(dāng) ConnectableFlux 對象有一個訂閱者時就開始產(chǎn)生消息。代碼 source.subscribe()的作用是訂閱該 ConnectableFlux 對象,讓其開始產(chǎn)生數(shù)據(jù)。接著當(dāng)前線程睡眠 5 秒鐘,第二個訂閱者此時只能獲得到該序列中的后 5 個元素,因此所輸出的是數(shù)字 5 到 9。

清單 28. 熱序列

1

2

3

4

5

6

7

8

9

final Flux<Long> source = Flux.intervalMillis(1000)

????????.take(10)

????????.publish()

????????.autoConnect();

source.subscribe();

Thread.sleep(5000);

source

????????.toStream()

????????.forEach(System.out::println);

小結(jié)

反應(yīng)式編程范式對于習(xí)慣了傳統(tǒng)編程范式的開發(fā)人員來說,既是一個需要進(jìn)行思維方式轉(zhuǎn)變的挑戰(zhàn),也是一個充滿了更多可能的機會。Reactor 作為一個基于反應(yīng)式流規(guī)范的新的 Java 庫,可以作為反應(yīng)式應(yīng)用的基礎(chǔ)。本文對 Reactor 庫做了詳細(xì)的介紹,包括 Flux 和 Mono 序列的創(chuàng)建、常用操作符的使用、調(diào)度器、錯誤處理以及測試和調(diào)試技巧等。

參考資源 (resources)

  • 參考 Reactor 的官方網(wǎng)站,了解 Reactor 的更多內(nèi)容。
  • 查看 Reactor 的用戶指南。
  • 查看 InfoQ 上的Reactor by Example。
  • 查看反應(yīng)式流規(guī)范。

資源下載(Reactor例子)

Reactor教程Mono和Flux例子_monoreactor,flux和mono-Java代碼類資源-CSDN下載

來源:使用Reactor進(jìn)行反應(yīng)式編程最全教程_hhaip-CSDN博客_reactor教程

與50位技術(shù)專家面對面20年技術(shù)見證,附贈技術(shù)全景圖

總結(jié)

以上是生活随笔為你收集整理的使用Reactor进行反应式编程最全教程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。