日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

响应式web(三):服务当中的三种耦合,流式计算,RXJava2,Flux,Mono

發布時間:2024/2/28 java 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 响应式web(三):服务当中的三种耦合,流式计算,RXJava2,Flux,Mono 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

響應式 Web 第三節

  • 服務調用中的三種耦合
  • 響應式流規范與接口
  • 響應式流中的流量控制
  • Web中的響應式與請求/響應式的區別
  • 流式處理中的Source/Sink模型
  • RXJava2 觀察者模式同步與異步實現
  • Project Reactor 中的 Flux、Mono
  • Flux、Mono 同步靜態創建與異步動態創建
  • WebFlux

服務當中的耦合

在調用服務的時候,總會有耦合,基于rmi的

1、技術耦合:dubbo,典型的基于rpc的遠程服務調用,兩邊都是java才能調用。

2、空間耦合:兩臺機器的依賴

3、時間耦合:服務的可用、不可用

微服務 就巧妙地解決了這三個維度上的耦合,但是所有調用幾乎都是同步調用。
異步調用能提升整體的性能嗎?不能,但是它能夠提高整體的吞吐量,防止雪崩

對于傳統編程模型的web服務:

  • 訪問量過大,web服務可能會oom,瀏覽器/app一次接這么多數據可能也會扛不住
  • 而且前端的展示要等待傳輸的過程

解決方法:分頁。
分頁缺點:只能追加,不能在中間插入,否則會在分頁取數據的時候發生混亂。也可以通過編碼解決,但是會增加整體業務的復雜度。如果使用私有數據的話,你會和別人看到的數據不一樣。
分頁缺點解決方法:響應式編程,基于發布/訂閱模型

發布/訂閱模型

  • mq:做數據緩沖、通知,不做持久化,數據可以推過去,或者主動去拉也可以
  • zk
  • sse:server sent push

List底層是數組,是固定長度的;Flux底層是流,是可變長度的,流的大小取決于緩沖區的大小。


響應式數據庫:例如,某個用戶發送短信超過100條之后,會反過來去回調服務的接口。

設置邊界:到達邊界之后,就流向server/service,要考慮一次流多少:如果流多了,會造成流量過大,解決方法:加緩沖區,可以在服務端加緩沖區,也可以在客戶端加緩沖區。
推送數據:超過客戶端的臨界值怎么辦?丟棄策略
拉取數據:rocketmq是拉數據
推數據和拉數據,都是流式計算的概念

流式計算

Flume,Flink都是處理流的。
大數據技術棧中,引入了很多先進的概念,web架構中沒有的。
Flume用來做大數據中對于日志的拉取。
Flink
source,channel,sink
source:數據源
channel:緩沖區
sink:目的地

處理數據:同步/異步
Flux<T>:可以裝0~n個數據
Mono:只能裝一個數據

背壓處理,慢消費,同一線程,好控制

響應式流的規范:Reactive規范

  • Reactive是響應式,jdk9引入了響應式的接口。
  • Project Reactor,RXJava是響應式的框架。RXJava在安卓領域用的比較多
  • webflux也是響應式框架,將servlet換成了netty或servlet3

代碼示例

Project Reactor

官網
https://projectreactor.io/

Reactor 是Spring5中構建各個響應式組件的基礎框架,內部提供了Flux和Mono兩個代表異步數據序列的核心組件。

Flux

靜態方法生成

// 靜態方法生成FluxString[] s = new String[] {"xx","oo"};// just 已知元素數量和內容 使用// Flux<String> flux1 = Flux.just(s); // flux1.subscribe(System.out::println);Flux<String> flux2 = Flux.just("xx","xxx"); // flux2.subscribe(System.out::println);//fromArray方法List<String> list = Arrays.asList("hello", "world");Flux<String> flux3 = Flux.fromIterable(list);// flux3.subscribe(System.out::println);//fromStream方法Stream<String> stream = Stream.of("hi", "hello");Flux<String> flux4 = Flux.fromStream(stream);// flux4.subscribe(System.out::println);//range方法Flux<Integer> range = Flux.range(0, 5);// range.subscribe(System.out::println);//interval方法, take方法限制個數為5個Flux<Long> longFlux = Flux.interval(Duration.ofSeconds(1)).take(5);longFlux.subscribe(System.out::println);//鏈式Flux.range(1, 5).subscribe(System.out::println); } //鏈式Flux.range(1, 5).subscribe(System.out::println);// 合并Flux<String> mergeWith = flux3.mergeWith(flux4);mergeWith.subscribe(System.out::println);System.out.println("---");// 結合為元祖Flux<String> source1 = Flux.just("111", "world","333");Flux<String> source2 = Flux.just("2111", "xxx");Flux<Tuple2<String, String>> zip = source1.zipWith(source2);zip.subscribe(tuple -> {System.out.println(tuple.getT1() + " -> " + tuple.getT2());}); // 跳過兩個Flux<String> flux = Flux.just("1111", "222", "333");Flux<String> skip = flux.skip(2);skip.subscribe(System.out::println);// 拿前幾個Flux<String> flux2 = Flux.just("1111", "222", "333");Flux<String> skip2 = flux2.take(2);skip2.subscribe(System.out::println);// 過濾Flux<String> flux = Flux.just("xx", "oo", "x1x");Flux<String> filter = flux.filter(s -> s.startsWith("x"));filter.subscribe(System.out::println);// 去重Flux<String> flux = Flux.just("xx", "oo", "x1x","x2x");Flux<String> filter = flux.filter(s -> s.startsWith("x")).distinct();filter.subscribe(System.out::println);// 轉 MonoFlux<String> flux = Flux.just("xx", "oo", "x1x","x2x");Mono<List<String>> mono = flux.collectList();mono.subscribe(System.out::println);// 邏輯運算 all 與 anyFlux<String> flux = Flux.just("xx", "oox", "x1x","x2x");Mono<Boolean> mono = flux.all(s -> s.contains("x"));mono.subscribe(System.out::println);

Mono 連接

Flux<String> concatWith = Mono.just("100").concatWith(Mono.just("100"));concatWith.subscribe(System.out::println);

異常處理

Mono.just("100").concatWith(Mono.error(new Exception("xx"))).onErrorReturn("xxx").subscribe(System.out::println)

動態創建

// 同步動態創建,next 只能被調用一次Flux.generate(sink -> {sink.next("xx");sink.complete();}).subscribe(System.out::print);} Flux.create(sink -> {for (int i = 0; i < 10; i++) {sink.next("xxoo:" + i);}sink.complete();}).subscribe(System.out::println);}

WebFlux

RXJava2

http://reactivex.io/#

Reactive Extensions

同步

哪個線程產生就在哪個線程消費

maven依賴

<!-- https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava --> <dependency><groupId>io.reactivex.rxjava2</groupId><artifactId>rxjava</artifactId> </dependency>

main

public static void main(String[] args) {Observable<String> girl = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> emitter) throws Exception {emitter.onNext("1");emitter.onNext("2");emitter.onNext("3");emitter.onNext("4");emitter.onNext("5");emitter.onComplete();}});// 觀察者Observer<String> man = new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {// TODO Auto-generated method stubSystem.out.println("onSubscribe" + d);}@Overridepublic void onNext(String t) {// TODO Auto-generated method stubSystem.out.println("onNext " + t);}@Overridepublic void onError(Throwable e) {// TODO Auto-generated method stubSystem.out.println("onError " + e.getMessage());}@Overridepublic void onComplete() {// TODO Auto-generated method stubSystem.out.println("onComplete");}};girl.subscribe(man);}

異步

方法說明
Schedulers.computation()適用于計算密集型任務
Schedulers.io()適用于 IO 密集型任務
Schedulers.trampoline()在某個調用 schedule 的線程執行
Schedulers.newThread()每個 Worker 對應一個新線程
Schedulers.single()所有 Worker 使用同一個線程執行任務
Schedulers.from(Executor)使用 Executor 作為任務執行的線程
public static void main(String[] args) throws InterruptedException {Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> emitter) throws Exception {emitter.onNext("1");emitter.onNext("2");emitter.onNext("3");emitter.onNext("4");emitter.onNext("5");emitter.onComplete(); }}).observeOn(Schedulers.computation()).subscribeOn( Schedulers.computation()).subscribe(new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {// TODO Auto-generated method stubSystem.out.println("onSubscribe");}@Overridepublic void onNext(String t) {// TODO Auto-generated method stubSystem.out.println("onNext");}@Overridepublic void onError(Throwable e) {// TODO Auto-generated method stubSystem.out.println("onError");}@Overridepublic void onComplete() {// TODO Auto-generated method stubSystem.out.println("onComplete");}});Thread.sleep(10000); }

下節課,我們講WebFlux的應用~

超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生

總結

以上是生活随笔為你收集整理的响应式web(三):服务当中的三种耦合,流式计算,RXJava2,Flux,Mono的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 叼嘿视频在线免费观看 | 免费视频一二三区 | 亚洲色图p | 欧美国产成人在线 | 青娱乐在线免费观看 | 国产91沙发系列 | 免费黄色成人 | 日本视频在线看 | bt天堂新版中文在线地址 | 亚洲永久免费精品 | 伊人久久精品视频 | 男人天堂网站 | 一区二区三区国产视频 | 中文日韩字幕 | 日韩特一级 | 久久xxxx| 麻豆91在线播放 | 国产在线视频一区 | 一级黄色免费看 | 野外做受又硬又粗又大视频√ | 丁香花在线影院观看在线播放 | 久久久久九九 | 国产嫩草影视 | 五月中文字幕 | 久久精品成人一区二区三区蜜臀 | 国产精品乱子伦 | av无毛| www.4hu95.com四虎 极品在线视频 | 国产亚洲AV无码成人网站在线 | www.亚洲激情| 一级片成人 | 久久久久99精品成人片试看 | 成人午夜免费福利视频 | 青娱乐在线播放 | 久久久久久久久久久99 | 中文字幕视频免费 | 女王脚交玉足榨精调教 | 免费观看日韩av | 午夜影院操| 亚洲性一区 | 精品在线视频一区二区 | 丰满人妻一区二区三区无码av | 欧美揉bbbbb揉bbbbb | 日本大胆裸体做爰视频 | 国产天堂一区 | 免费特级黄毛片 | 中国av毛片 | 污网站在线播放 | 台湾三级伦理片 | 97成人免费视频 | 国产亚洲不卡 | 自拍偷拍福利 | 久久精品无码专区免费 | 一本黄色片 | 欧美精品欧美极品欧美激情 | 嫩草视频网站 | 国产精品xxx在线观看 | www.色人阁| 伊人超碰在线 | 精品国产乱码久久久久久蜜臀 | 高清无码一区二区在线观看吞精 | 亚洲va欧美| 狠狠干2024 | 欧美精品一 | 亚洲永久在线观看 | 青青草一区二区三区 | 中文字幕韩日 | av网站在线看 | 欧美午夜精品一区二区三区电影 | 波多野结衣网站 | 黄色片在线播放 | 韩国一区二区在线播放 | 亚洲第一二区 | 蜜桃av噜噜一区二区三区麻豆 | 99re6热在线精品视频播放 | 日韩av资源在线观看 | 波多野结衣中文字幕一区二区 | 激情五月综合网 | 国产精品一区二区三区不卡 | 色综合天天网 | 免费三级黄 | 欧美xxxx吸乳| 视频日韩 | 三级在线网址 | 泷泽萝拉在线播放 | 老牛嫩草二区三区观影体验 | 老头巨大又粗又长xxxxx | 亚洲国产成人无码av在线 | 欧美精品卡一卡二 | 中文字幕观看视频 | 超碰997| 日韩av无码一区二区三区不卡 | 欧美黄色小说 | 四级毛片 | 亚洲成在线 | 欧美日韩一区免费 | 日av在线播放 | 白浆一区| 日本少妇裸体 |