response获取响应内容_Project Reactor 深度解析 - 1. 响应式编程介绍,实现以及现有问题
現(xiàn)在, Java 的各種基于 Reactor 模型的響應(yīng)式編程庫或者框架越來越多了,像是 RxJava,Project Reactor,Vert.x 等等等等。在 Java 9, Java 也引入了自己的 響應(yīng)式編程的一種標(biāo)準(zhǔn)接口,即java.util.concurrent.Flow這個類。這個類里面規(guī)定了 Java 響應(yīng)式編程所要實現(xiàn)的接口與抽象。我們這個系列要討論的就是Project Reactor這個實現(xiàn)。
這里也提一下,為了能對于沒有升級到 Java 9 的用戶也能兼容,java.util.concurrent.Flow這個類也被放入了一個 jar 供 Java 9 之前的版本,依賴是:
<dependency><groupId>org.reactivestreams</groupId><artifactId>reactive-streams</artifactId><version>1.0.3</version> </dependency>本系列所講述的 Project Reactor 就是 reactive-streams 的一種實現(xiàn)。 首先,我們先來了解下,什么是響應(yīng)式編程,Java 如何實現(xiàn)
什么是響應(yīng)式編程,Java 如何實現(xiàn)
我們這里用通過唯一 id 獲取知乎的某個回答作為例子,首先我們先明確下,一次HTTP請求到服務(wù)器上處理完之后,將響應(yīng)寫回這次請求的連接,就是完成這次請求了,如下:
public void request(Connection connection, HttpRequest request) {//處理request,省略代碼connection.write(response);//完成響應(yīng) }假設(shè)獲取回答需要調(diào)用兩個接口,獲取評論數(shù)量還有獲取回答信息,傳統(tǒng)的代碼可能會這么去寫:
//獲取評論數(shù)量 public void getCommentCount(Connection connection, HttpRequest request) {Integer commentCount = null;try {//從緩存獲取評論數(shù)量,阻塞IOcommentCount = getCommnetCountFromCache(id);} catch(Exception e) {try {//緩存獲取失敗就從數(shù)據(jù)庫中獲取,阻塞IOcommentCount = getVoteCountFromDB(id);} catch(Exception ex) {}}connection.write(commentCount); }//獲取回答 public void getAnswer(Connection connection, HttpRequest request) {//獲取點贊數(shù)量Integer voteCount = null;try {//從緩存獲取點贊數(shù)量,阻塞IOvoteCount = getVoteCountFromCache(id);} catch(Exception e) {try {//緩存獲取失敗就從數(shù)據(jù)庫中獲取,阻塞IOvoteCount = getVoteCountFromDB(id);} catch(Exception ex) {}}//從數(shù)據(jù)庫獲取回答信息,阻塞IOAnswer answer = getAnswerFromDB(id);//拼裝ResponseResultVO response = new ResultVO();if (voteCount != null) {response.setVoteCount(voteCount);}if (answer != null) {response.setAnswer(answer);}connection.write(response);//完成響應(yīng) }在這種實現(xiàn)下,你的進(jìn)程只需要一個線程池,承載了所有請求。這種實現(xiàn)下,有兩個弊端:
現(xiàn)在,NIO 非阻塞 IO 很普及了,有了非阻塞 IO,我們可以通過響應(yīng)式編程,來讓我們的線程不會阻塞,而是一直在處理請求。這是如何實現(xiàn)的呢?
傳統(tǒng)的 BIO,是線程將數(shù)據(jù)寫入 Connection 之后,當(dāng)前線程進(jìn)入 Block 狀態(tài),直到響應(yīng)返回,之后接著做響應(yīng)返回后的動作。NIO 則是線程將數(shù)據(jù)寫入 Connection 之后,將響應(yīng)返回后需要做的事情以及參數(shù)緩存到一個地方之后,直接返回。在有響應(yīng)返回后,NIO 的 Selector 的 Read 事件會是 Ready 狀態(tài),掃描 Selector 事件的線程,會告訴你的線程池數(shù)據(jù)好了,然后線程池中的某個線程,拿出剛剛緩存的要做的事情還有參數(shù),繼續(xù)處理。
那么,怎樣實現(xiàn)緩存響應(yīng)返回后需要做的事情以及參數(shù)的呢?Java 本身提供了兩種接口,一個是基于回調(diào)的 Callback 接口(Java 8 引入的各種Functional Interface),一種是 Future 框架。
基于 Callback 的實現(xiàn):
//獲取回答 public void getAnswer(Connection connection, HttpRequest request) {ResultVO resultVO = new ResultVO();getVoteCountFromCache(id, (count, throwable) -> {//異常不為null則為獲取失敗if (throwable != null) {//讀取緩存失敗就從數(shù)據(jù)庫獲取getVoteCountFromDB(id, (count2, throwable2) -> {if (throwable2 == null) {resultVO.setVoteCount(voteCount);}//從數(shù)據(jù)庫讀取回答信息getAnswerFromDB(id, (answer, throwable3) -> {if (throwable3 == null) {resultVO.setAnswer(answer);connection.write(resultVO);} else {connection.write(throwable3);}});});} else {//獲取成功,設(shè)置voteCountresultVO.setVoteCount(voteCount);//從數(shù)據(jù)庫讀取回答信息getAnswerFromDB(id, (answer, throwable2) -> {if (throwable2 == null) {resultVO.setAnswer(answer);//返回響應(yīng)connection.write(resultVO);} else {//返回錯誤響應(yīng)connection.write(throwable2);}});}}); }可以看出,隨著調(diào)用層級的加深,callback 層級越來越深,越來越難寫,而且啰嗦的代碼很多。并且,基于 CallBack 想實現(xiàn)獲取點贊數(shù)量其實和獲取回答信息并發(fā)是很難寫的,這里還是先獲取點贊數(shù)量之后再獲取回答信息。
那么基于 Future 呢?我們用 Java 8 之后引入的 CompletableFuture 來試著實現(xiàn)下。
//獲取回答 public void getAnswer(Connection connection, HttpRequest request) {ResultVO resultVO = new ResultVO();//所有的異步任務(wù)都執(zhí)行完之后要做的事情CompletableFuture.allOf(getVoteCountFromCache(id)//發(fā)生異常,從數(shù)據(jù)庫讀取.exceptionallyComposeAsync(throwable -> getVoteCountFromDB(id))//讀取完之后,設(shè)置VoteCount.thenAccept(voteCount -> {resultVO.setVoteCount(voteCount);}),getAnswerFromDB(id).thenAccept(answer -> {resultVO.setAnswer(answer);})).exceptionallyAsync(throwable -> {connection.write(throwable);}).thenRun(() -> {connection.write(resultVO);}); }這種實現(xiàn)就看上去簡單多了,并且讀取點贊數(shù)量還有讀取回答內(nèi)容是同時進(jìn)行的。 Project Reactor 在 Completableuture 這種實現(xiàn)的基礎(chǔ)上,增加了更多的組合方式以及更完善的異常處理機制,以及面對背壓時候的處理機制,還有重試機制。
響應(yīng)式編程里面遇到的問題 - 背壓
由于響應(yīng)式編程,不阻塞,所以把之前因為基本不會發(fā)生而忽視的一個問題帶了上來,就是背壓(Back Pressure)。
背壓是指,當(dāng)上游請求過多,下游服務(wù)來不及響應(yīng),導(dǎo)致 Buffer 溢出的這樣一個問題。在響應(yīng)式編程,由于線程不阻塞,遇到 IO 就會把當(dāng)前參數(shù)和要做的事情緩存起來,這樣無疑增大了很多吞吐量,同時內(nèi)存占用也大了起來,如果不限制的話,很可能 OutOfMemory,這就是背壓問題。
在這個問題上,Project Reactor 基于的模型,是有處理方式的,Completableuture 這個體系里面沒有。
為何現(xiàn)在響應(yīng)式編程在業(yè)務(wù)開發(fā)微服務(wù)開發(fā)不普及
主要因為數(shù)據(jù)庫 IO,不是 NIO。
不論是Java自帶的Future框架,還是 Spring WebFlux,還是 Vert.x,他們都是一種非阻塞的基于Ractor模型的框架(后兩個框架都是利用netty實現(xiàn))。
在阻塞編程模式里,任何一個請求,都需要一個線程去處理,如果io阻塞了,那么這個線程也會阻塞在那。但是在非阻塞編程里面,基于響應(yīng)式的編程,線程不會被阻塞,還可以處理其他請求。舉一個簡單例子:假設(shè)只有一個線程池,請求來的時候,線程池處理,需要讀取數(shù)據(jù)庫 IO,這個 IO 是 NIO 非阻塞 IO,那么就將請求數(shù)據(jù)寫入數(shù)據(jù)庫連接,直接返回。之后數(shù)據(jù)庫返回數(shù)據(jù),這個鏈接的 Selector 會有 Read 事件準(zhǔn)備就緒,這時候,再通過這個線程池去讀取數(shù)據(jù)處理(相當(dāng)于回調(diào)),這時候用的線程和之前不一定是同一個線程。這樣的話,線程就不用等待數(shù)據(jù)庫返回,而是直接處理其他請求。這樣情況下,即使某個業(yè)務(wù) SQL 的執(zhí)行時間長,也不會影響其他業(yè)務(wù)的執(zhí)行。
但是,這一切的基礎(chǔ),是 IO 必須是非阻塞 IO,也就是 NIO(或者 AIO)。官方JDBC沒有 NIO,只有 BIO 實現(xiàn)。這樣無法讓線程將請求寫入鏈接之后直接返回,必須等待響應(yīng)。但是也就解決方案,就是通過其他線程池,專門處理數(shù)據(jù)庫請求并等待返回進(jìn)行回調(diào),也就是業(yè)務(wù)線程池 A 將數(shù)據(jù)庫 BIO 請求交給線程池B處理,讀取完數(shù)據(jù)之后,再交給 A 執(zhí)行剩下的業(yè)務(wù)邏輯。這樣A也不用阻塞,可以處理其他請求。但是,這樣還是有因為某個業(yè)務(wù) SQL 的執(zhí)行時間長,導(dǎo)致B所有線程被阻塞住隊列也滿了從而A的請求也被阻塞的情況,這是不完美的實現(xiàn)。真正完美的,需要 JDBC 實現(xiàn) NIO。
Java 自帶的 Future框架可以這么用JDBC:
@GetMapping public DeferredResult<Result> get() { DeferredResult<Result> deferredResult = new DeferredResult<>(); CompletableFuture.supplyAsync(() -> {return 阻塞數(shù)據(jù)庫IO;//dbThreadPool用來處理阻塞的數(shù)據(jù)庫IO}, dbThreadPool).thenComposeAsync(result -> {//spring 的 DeferredResult 來實現(xiàn)異步回調(diào)寫入結(jié)果返回deferredResult.setResult(result); }); return deferredResult; }WebFlux 也可以使用阻塞JDBC,但是同理:
@GetMapping public Mono<Result> get() { return Mono.fromFuture(CompletableFuture.supplyAsync(() -> {return 阻塞數(shù)據(jù)庫IO;//dbThreadPool用來處理阻塞的數(shù)據(jù)庫IO}, dbThreadPool)); }Vert.x 也可以使用阻塞的JDBC,也是同理:
@GetMapping public DeferredResult<Result> get() { DeferredResult<Result> deferredResult = new DeferredResult<>(); getResultFromDB().setHandler(asyncResult -> {if (asyncResult.succeeded()) {deferredResult.setResult(asyncResult.result());} else {deferredResult.setErrorResult(asyncResult.cause());}}); return deferredResult; }private WorkerExecutor dbThreadPool = vertx.createSharedWorkerExecutor("DB", 16);private Future<Result> getResultFromDB() {Future<Result> result = Future.future();dbThreadPool.executeBlocking(future -> {return 阻塞數(shù)據(jù)庫IO;}, false, asyncResult -> {if (asyncResult.succeeded()) {result.complete(asyncResult.result());} else {result.fail(asyncResult.cause());}});return result; }相當(dāng)于通過另外的線程池(當(dāng)然也可以通過原有線程池,反正就是要用和請求不一樣的線程,才能實現(xiàn)回調(diào),而不是當(dāng)次就阻塞等待),封裝了阻塞 JDBC IO。
但是,這樣幾乎對數(shù)據(jù)庫IO主導(dǎo)的應(yīng)用性能沒有提升,還增加了線程切換,得不償失。所以,需要使用真正實現(xiàn)了 NIO 的數(shù)據(jù)庫客戶端。目前有這些 NIO 的 JDBC 客戶端,但是都不普及:
總結(jié)
以上是生活随笔為你收集整理的response获取响应内容_Project Reactor 深度解析 - 1. 响应式编程介绍,实现以及现有问题的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python效率提升_Python GU
- 下一篇: 圆锥破碎机常见故障_圆锥破碎机飞车危害大