日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

具有CompletableFuture的异步超时

發布時間:2023/12/3 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 具有CompletableFuture的异步超时 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

有一天,我重寫了執行不佳的多線程代碼,該代碼在Future.get()某個時刻被阻塞:

public void serve() throws InterruptedException, ExecutionException, TimeoutException {final Future<Response> responseFuture = asyncCode();final Response response = responseFuture.get(1, SECONDS);send(response); }private void send(Response response) {//... }

這實際上是一個用Java編寫的Akka應用程序,具有1000個線程的線程池(原文如此!)–所有這些都在此get()調用中被阻塞。 否則系統無法跟上并發請求的數量。 重構之后,我們擺脫了所有這些線程,只引入了一個,大大減少了內存占用。 讓我們簡化一下并顯示Java 8中的示例。第一步是引入CompletableFuture而不是普通的Future (請參閱提示9 )。 很簡單,如果:

  • 您可以控制如何將任務提交給ExecutorService :只需使用CompletableFuture.supplyAsync(..., executorService)而不是executorService.submit(...)
  • 您處理基于回調的API:使用Promise

否則(如果您已經阻塞了API或Future<T> ),將有一些線程被阻塞。 這就是為什么現在誕生了這么多異步API的原因。 假設我們以某種方式重寫了代碼以接收CompletableFuture :

public void serve() throws InterruptedException, ExecutionException, TimeoutException {final CompletableFuture<Response> responseFuture = asyncCode();final Response response = responseFuture.get(1, SECONDS);send(response); }

顯然,這并不能解決任何問題,我們必須利用新的反應式編程風格:

public void serve() {final CompletableFuture<Response> responseFuture = asyncCode();responseFuture.thenAccept(this::send); }

這在功能上是等效的,但是現在serve()應該立即運行(沒有阻塞或等待)。 只要記住, this::send將在完成responseFuture的同一線程中執行。 如果您不想在某個地方重載某些任意線程池或send()昂貴,請考慮為此使用單獨的線程池: thenAcceptAsync(this::send, sendPool) 。 很好,但是我們失去了兩個重要的屬性:錯誤傳播和超時。 由于我們更改了API,因此錯誤傳播很難。 當serve()方法退出時,異步操作可能尚未完成。 如果您關心異常,請考慮返回responseFuture或其他替代機制。 至少,請記錄異常,因為否則它將被吞噬:

final CompletableFuture<Response> responseFuture = asyncCode(); responseFuture.exceptionally(throwable -> {log.error("Unrecoverable error", throwable);return null; });

請注意上面的代碼: exceptionally()嘗試從故障中恢復 ,并返回替代結果。 它在這里有效,但是如果您將thenAccept() exceptionally()與thenAccept() ,即使在失敗的情況下, send()也會被調用,但是參數為null (或者我們從exceptionally()返回的值exceptionally() :

responseFuture.exceptionally(throwable -> {log.error("Unrecoverable error", throwable);return null;}).thenAccept(this::send); //probably not what you think

丟失1秒超時的問題非常微妙。 我們的原始代碼等待(阻塞)最多1秒鐘,直到Future完成。 否則拋出TimeoutException 。 我們失去了此功能,甚至超時的更糟糕的單元測試也不方便并且經常被跳過。 為了在不犧牲事件驅動精神的前提下實現超時,我們需要一個額外的構建塊:在給定時間之后始終失敗的未來:

public static <T> CompletableFuture<T> failAfter(Duration duration) {final CompletableFuture<T> promise = new CompletableFuture<>();scheduler.schedule(() -> {final TimeoutException ex = new TimeoutException("Timeout after " + duration);return promise.completeExceptionally(ex);}, duration.toMillis(), MILLISECONDS);return promise; }private static final ScheduledExecutorService scheduler =Executors.newScheduledThreadPool(1,new ThreadFactoryBuilder().setDaemon(true).setNameFormat("failAfter-%d").build());

這很簡單:我們創建一個承諾 (沒有基礎任務或線程池的未來),并在給定java.time.Duration之后使用TimeoutException完成它。 如果您get()某個地方get()這樣的未來,則阻塞了至少那么多時間后,將拋出TimeoutException 。 實際上,它將是ExecutionException包裝TimeoutException ,沒有辦法解決。 請注意,我僅使用一個線程使用固定scheduler線程池。 這不僅是出于教育目的:“在這種情況下,“ 1個線程對于任何人都應該足夠 ”” [1] 。 failAfter()本身是沒有用的,但是將其與我們的responseFuture結合起來,我們就有了解決方案!

final CompletableFuture<Response> responseFuture = asyncCode(); final CompletableFuture<Response> oneSecondTimeout = failAfter(Duration.ofSeconds(1)); responseFuture.acceptEither(oneSecondTimeout, this::send).exceptionally(throwable -> {log.error("Problem", throwable);return null;});

這里發生了很多事情。 在通過我們的后臺任務接收到responseFuture ,我們還創建了一個“合成的” oneSecondTimeout將來,它將永遠不會成功完成,但總是在1秒后失敗。 現在,我們通過調用acceptEither合并兩者。 該運算符將針對第一個完成的將來( responseFuture或oneSecondTimeout執行代碼塊,而只是忽略較慢的代碼的結果。 如果asyncCode()內1完成第二this::send將被調用,并從異常oneSecondTimeout會被忽略。 然而! 如果asyncCode()確實很慢,則oneSecondTimeout啟動。 但是由于它失敗并帶有異常,因此將調用exceptionally錯誤處理程序,而不是this::send 。 您可以認為send()或exceptionally都將被調用,而不是兩者都被調用。 當然,如果我們有兩個正常完成的“普通”期貨,則將調用前一個的響應來調用send() ,并丟棄后者。

這不是最干凈的解決方案。 一個干凈的人會包裝原始的未來,并確保它在給定的時間內完成。 此類操作符可在com.twitter.util.Future (Scala;稱為com.twitter.util.Future ( within() )中使用,但是在scala.concurrent.Future丟失(可能是受前者啟發)。 讓我們留下Scala并為CompletableFuture實現類似的運算符。 它以一個Future作為輸入,并返回一個在基礎底層完成時完成的Future。 但是,如果完成基礎未來花費的時間太長,則會引發異常:

public static <T> CompletableFuture<T> within(CompletableFuture<T> future, Duration duration) {final CompletableFuture<T> timeout = failAfter(duration);return future.applyToEither(timeout, Function.identity()); }

這導致了最終,清潔和靈活的解決方案:

final CompletableFuture<Response> responseFuture = within(asyncCode(), Duration.ofSeconds(1)); responseFuture.thenAccept(this::send).exceptionally(throwable -> {log.error("Unrecoverable error", throwable);return null;});

希望您喜歡這篇文章,因為您可以看到Java中的反應式編程已不再是未來的事情(無雙關語)。

翻譯自: https://www.javacodegeeks.com/2014/12/asynchronous-timeouts-with-completablefuture.html

總結

以上是生活随笔為你收集整理的具有CompletableFuture的异步超时的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。