Mono 的创建
目錄
- 前言
- 一、響應式數據流
- 二、Mono的創建
前言
????????Reactor 是一個基于 JVM 之上的異步應用基礎庫。為 Java 、Groovy 和其他 JVM 語言提供了構建基于事件和數據驅動應用的抽象庫,可用它構建時效性流式數據應用,實現應用高效、異步地傳遞消息。
????????Reactor 性能相當高,在最新的硬件平臺上,使用無堵塞分發器每秒鐘可處理 1500 萬事件。
????????Reactor 中的兩個重要的概念是Flux 和 Mono。Mono 表示包含 0 或者 1 個元素的異步序列。Flux 表示包含 0 到 N 個元素的異步序列。他們包含三種不同類型的消息通知:正常的包含元素的消息、序列結束的消息和序列出錯的消息。
一、響應式數據流
????????響應式數據流 作為一種新的數據流規范應用于 Java 9 及其后續版本,旨在提供同/異步數據序列流式控制機制。
1、響應式數據流接口
org.reactivestreams.Pubslisher:數據流發布者(信號從 0 到 N,N 可為無窮)。提供兩個可選終端事件:錯誤和完成。
org.reactivestreams.Subscriber:數據流消費者(信號從 0 到 N,N 可為無窮)。消費者初始化過程中,會請求生產者當前需要訂閱多少數據。其他情況,通過接口回調與數據生產方交互: 下一條(新消息)和狀態。狀態包括:完成/錯誤,可選。
org.reactivestreams.Subscription:初始化階段將一個小追蹤器傳遞給訂閱者。它控制著我們準備好來消費多少數據,以及我們想要什么時候停止消費(取消)。
org.reactivestreams.Processor:同時作為發布者和訂閱者的組件的標記。
2、響應式數據流發布協議
訂閱者有兩種方式向發布者請求數據:
無界的:訂閱者只需要調用 Subscription#request(Long.MAX_VALUE) 即可。
有界的:訂閱者保留數據引用,調用request(long) 方法消費。
二、Mono的創建
(1)empty
不包含任何元素,可以發布結束消息
@Testpublic void empty(){Mono.empty().subscribe(System.out::println);}(2)just
包含指定元素
@Testpublic void just(){Mono.just("hello mono").subscribe(System.out::println);}(3)justOrEmpty
有元素時相當于just,
沒有元素時相當于empty,
元素是Optional時,則根據Optional里是否有值來創建just或empty。
(4)never
不包含任何元素
@Testpublic void never(){Mono.never().subscribe(System.out::println);}(5)from
@Testpublic void from() {//從 Publisher 生成 MonoMono.from(Mono.just("hello mono")).subscribe(System.out::println);//從 Publisher 生成 Mono,會對Flux類型進行包裝Mono.fromDirect(Mono.just("hello mono")).subscribe(System.out::println);//從 Supplier 生成 MonoMono.fromSupplier(() -> "hello mono").subscribe(System.out::println);//從 Runnable 生成 MonoMono.fromRunnable(() -> System.out.println("hello mono")).subscribe(System.out::println);//從 Callable 生成 MonoMono.fromCallable(() ->"hello mono").subscribe(System.out::println);//從 CompletableFuture 生成 MonoMono.fromFuture(CompletableFuture.completedFuture("hello mono")).subscribe(System.out::println);//從 CompletionStage 生成 MonoMono.fromCompletionStage(CompletableFuture.completedFuture("hello mono")).subscribe(System.out::println);//從 Supplier<? extends CompletionStage<? extends T> 生成 MonoMono.fromCompletionStage(() -> CompletableFuture.completedFuture("hello mono")).subscribe(System.out::println);}(6)defer
@Testpublic void defer() {//從 Supplier 獲取 monoMono.defer(() -> Mono.just("hello mono")).subscribe(System.out::println);//從 Function<ContextView, ? extends Mono<? extends T>> 獲取 monoMono.deferContextual(view -> Mono.just("hello mono")).subscribe(System.out::println);}(7)delay
@Testpublic void delay() throws InterruptedException {//指定延時時間,發布值是0Mono.delay(Duration.of(5, ChronoUnit.SECONDS)).subscribe(System.out::println);Mono.delay(Duration.of(5, ChronoUnit.SECONDS), Schedulers.parallel()).subscribe(System.out::println);TimeUnit.SECONDS.sleep(15);}(8)error
@Testpublic void error() throws InterruptedException {//包含異常的 monoMono.error(new RuntimeException("出錯了")).subscribe(System.out::println);//Supplier 提供包含異常的 monoMono.error(() -> new RuntimeException("出錯了")).subscribe(System.out::println);}(9)first
@Testpublic void first() throws InterruptedException {//處理第一個 mono,如果第一個取消了,處理第二個...Mono.firstWithSignal(Mono.empty(), Mono.just("mono hello")).subscribe(System.out::println);Mono.firstWithValue(Mono.just("hello mono"), Mono.just("mono hello")).subscribe(System.out::println);Mono.firstWithSignal(List.of(Mono.just("hello mono"), Mono.just("mono hello"))).subscribe(System.out::println);Mono.firstWithValue(List.of(Mono.just("hello mono"), Mono.just("mono hello"))).subscribe(System.out::println);}(10)sequenceEqual
@Testpublic void sequenceEqual() throws InterruptedException {//比較兩個 mono 是都相同Mono.sequenceEqual(Mono.just("hello mono"), Mono.just("hello mono")).subscribe(System.out::println);Mono.sequenceEqual(Mono.just("hello mono"), Mono.just("hello mono"), Object::equals).subscribe(System.out::println);Mono.sequenceEqual(Mono.just("hello mono"), Mono.just("hello mono"), Object::equals, 16).subscribe(System.out::println);}(11)using
@Testpublic void using() throws InterruptedException {//callable返回mono,//function對mono進行操作//consumer執行清理操作//eager 為true時,consumer在subscribe之前調用Mono.using(() -> Mono.just("hello mono"), Function.identity(), t -> System.out.println(t)).subscribe(System.out::println);}(12)when
@Testpublic void when() throws InterruptedException {//執行預設的操作Mono.when(Mono.just("hello mono").filter(a -> a.equals("hello mono"))).subscribe(System.out::println);}(13)zip
壓縮操作
總結
- 上一篇: 为win7系统盘减肥
- 下一篇: Objective-C——initial