日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java 并发框架全览,这个牛逼!

發布時間:2025/3/21 java 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java 并发框架全览,这个牛逼! 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

來自:唐尤華

https://dzone.com/articles/a-birds-eye-view-on-java-concurrency-frameworks-1

1. 為什么要寫這篇文章

幾年前 NoSQL 開始流行的時候,像其他團隊一樣,我們的團隊也熱衷于令人興奮的新東西,并且計劃替換一個應用程序的數據庫。 但是,當深入實現細節時,我們想起了一位智者曾經說過的話:“細節決定成敗”。最終我們意識到 NoSQL 不是解決所有問題的銀彈,而 NoSQL vs RDMS 的答案是:“視情況而定”。 類似地,去年RxJava 和 Spring Reactor 這樣的并發庫加入了讓人充滿激情的語句,如異步非阻塞方法等。為了避免再犯同樣的錯誤,我們嘗試評估諸如 ExecutorService、 RxJava、Disruptor 和 Akka 這些并發框架彼此之間的差異,以及如何確定各自框架的正確用法。

本文中用到的術語在這里有更詳細的描述。

2. 分析并發框架的示例用例

3. 快速更新線程配置

在開始比較并發框架的之前,讓我們快速復習一下如何配置最佳線程數以提高并行任務的性能。 這個理論適用于所有框架,并且在所有框架中使用相同的線程配置來度量性能。

  • 對于內存任務,線程的數量大約等于具有最佳性能的內核的數量,盡管它可以根據各自處理器中的超線程特性進行一些更改。

    • 例如,在8核機器中,如果對應用程序的每個請求都必須在內存中并行執行4個任務,那么這臺機器上的負載應該保持為?@2 req/sec,在?ThreadPool?中保持8個線程。

  • 對于 I/O 任務,ExecutorService?中配置的線程數應該取決于外部服務的延遲。

    • 與內存中的任務不同,I/O 任務中涉及的線程將被阻塞,并處于等待狀態,直到外部服務響應或超時。 因此,當涉及 I/O 任務線程被阻塞時,應該增加線程的數量,以處理來自并發請求的額外負載。

    • I/O 任務的線程數應該以保守的方式增加,因為處于活動狀態的許多線程帶來了上下文切換的成本,這將影響應用程序的性能。 為了避免這種情況,應該根據 I/O 任務中涉及的線程的等待時間按比例增加此機器的線程的確切數量以及負載。

4. 性能測試結果

性能測試配置 GCP -> 處理器:Intel(R) Xeon(R) CPU @ 2.30GHz;架構:x86_64;CPU 內核:8個(注意: 這些結果僅對該配置有意義,并不表示一個框架比另一個框架更好)。

5. 使用執行器服務并行化 IO 任務

5.1 何時使用?

如果一個應用程序部署在多個節點上,并且每個節點的 req/sec 小于可用的核心數量,那么?ExecutorService?可用于并行化任務,更快地執行代碼。

5.2 什么時候適用?

如果一個應用程序部署在多個節點上,并且每個節點的 req/sec 遠遠高于可用的核心數量,那么使用?ExecutorService?進一步并行化只會使情況變得更糟。

當外部服務延遲增加到 400ms 時,性能測試結果如下(請求速率 @50 req/sec,8核)。

5.3 所有任務按順序執行示例

// I/O 任務:調用外部服務 String posts = JsonService.getPosts(); String comments = JsonService.getComments(); String albums = JsonService.getAlbums(); String photos = JsonService.getPhotos();// 合并來自外部服務的響應 // (內存中的任務將作為此操作的一部分執行) int userId = new Random().nextInt(10) + 1; String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments); String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);// 構建最終響應并將其發送回客戶端 String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser; return response;

5.4 I/O 任務與 ExecutorService 并行執行代碼示例

// 添加 I/O 任務 List<Callable<String>> ioCallableTasks = new ArrayList<>(); ioCallableTasks.add(JsonService::getPosts); ioCallableTasks.add(JsonService::getComments); ioCallableTasks.add(JsonService::getAlbums); ioCallableTasks.add(JsonService::getPhotos);// 調用所有并行任務 ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize); List<Future<String>> futuresOfIOTasks = ioExecutorService.invokeAll(ioCallableTasks);// 獲取 I/O ?操作(阻塞調用)結果 String posts = futuresOfIOTasks.get(0).get(); String comments = futuresOfIOTasks.get(1).get(); String albums = futuresOfIOTasks.get(2).get(); String photos = futuresOfIOTasks.get(3).get();// 合并響應(內存中的任務是此操作的一部分) String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments); String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);// 構建最終響應并將其發送回客戶端 return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;

6. 使用執行器服務并行化 IO 任務(CompletableFuture)

與上述情況類似:處理傳入請求的 HTTP 線程被阻塞,而 CompletableFuture 用于處理并行任務

6.1 何時使用?

如果沒有?AsyncResponse,性能與?ExecutorService?相同。 如果多個 API 調用必須異步并且鏈接起來,那么這種方法更好(類似 Node 中的 Promises)。

ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);// I/O 任務 CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService); CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,ioExecutorService); CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,ioExecutorService); CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,ioExecutorService); CompletableFuture.allOf(postsFuture, commentsFuture, albumsFuture, photosFuture).get();// 從 I/O 任務(阻塞調用)獲得響應 String posts = postsFuture.get(); String comments = commentsFuture.get(); String albums = albumsFuture.get(); String photos = photosFuture.get();// 合并響應(內存中的任務將是此操作的一部分) String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments); String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);// 構建最終響應并將其發送回客戶端 return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;

7. 使用 ExecutorService 并行處理所有任務

使用?ExecutorService?并行處理所有任務,并使用?@suspended AsyncResponse response?以非阻塞方式發送響應。

  • HTTP 線程處理傳入請求的連接,并將處理傳遞給 Executor Pool,當所有任務完成后,另一個 HTTP 線程將把響應發送回客戶端(異步非阻塞)。

  • 性能下降原因:

    • 在同步通信中,盡管 I/O 任務中涉及的線程被阻塞,但是只要進程有額外的線程來承擔并發請求負載,它仍然處于運行狀態。

    • 因此,以非阻塞方式保持線程所帶來的好處非常少,而且在此模式中處理請求所涉及的成本似乎很高。

    • 通常,對這里討論采用的例子使用異步非阻塞方法會降低應用程序的性能。

7.1 何時使用?

如果用例類似于服務器端聊天應用程序,在客戶端響應之前,線程不需要保持連接,那么異步、非阻塞方法比同步通信更受歡迎。在這些用例中,系統資源可以通過異步、非阻塞方法得到更好的利用,而不僅僅是等待。

// 為異步執行提交并行任務 ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize); CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService); CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments, ioExecutorService); CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums, ioExecutorService); CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos, ioExecutorService);// 當 /posts API 返回響應時,它將與來自 /comments API 的響應結合在一起 // 作為這個操作的一部分,將執行內存中的一些任務 CompletableFuture<String> postsAndCommentsFuture = postsFuture.thenCombineAsync(commentsFuture, (posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments), ioExecutorService);// 當 /albums API 返回響應時,它將與來自 /photos API 的響應結合在一起 // 作為這個操作的一部分,將執行內存中的一些任務 CompletableFuture<String> albumsAndPhotosFuture = albumsFuture.thenCombineAsync(photosFuture, (albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos), ioExecutorService);// 構建最終響應并恢復 http 連接,把響應發送回客戶端 postsAndCommentsFuture.thenAcceptBothAsync(albumsAndPhotosFuture, (s1, s2) -> { LOG.info("Building Async Response in Thread " + Thread.currentThread().getName()); String response = s1 + s2; asyncHttpResponse.resume(response); }, ioExecutorService);

8. RxJava

  • 這與上面的情況類似,唯一的區別是 RxJava 提供了更好的 DSL 可以進行流式編程,下面的例子中沒有體現這一點。

  • 性能優于?CompletableFuture?處理并行任務。

8.1 何時使用?

如果編碼的場景適合異步非阻塞方式,那么可以首選 RxJava 或任何響應式開發庫。 還具有諸如 back-pressure 之類的附加功能,可以在生產者和消費者之間平衡負載。

int userId = new Random().nextInt(10) + 1; ExecutorService executor = CustomThreads.getExecutorService(8);// I/O 任務 Observable<String> postsObservable = Observable.just(userId).map(o -> JsonService.getPosts()) .subscribeOn(Schedulers.from(executor)); Observable<String> commentsObservable = Observable.just(userId).map(o -> JsonService.getComments()) .subscribeOn(Schedulers.from(executor)); Observable<String> albumsObservable = Observable.just(userId).map(o -> JsonService.getAlbums()) .subscribeOn(Schedulers.from(executor)); Observable<String> photosObservable = Observable.just(userId).map(o -> JsonService.getPhotos()) .subscribeOn(Schedulers.from(executor));// 合并來自 /posts 和 /comments API 的響應 // 作為這個操作的一部分,將執行內存中的一些任務 Observable<String> postsAndCommentsObservable = Observable .zip(postsObservable, commentsObservable, (posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments)) .subscribeOn(Schedulers.from(executor));// 合并來自 /albums 和 /photos API 的響應 // 作為這個操作的一部分,將執行內存中的一些任務 Observable<String> albumsAndPhotosObservable = Observable .zip(albumsObservable, photosObservable, (albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos)) .subscribeOn(Schedulers.from(executor));// 構建最終響應 Observable.zip(postsAndCommentsObservable, albumsAndPhotosObservable, (r1, r2) -> r1 + r2) .subscribeOn(Schedulers.from(executor)) .subscribe((response) -> asyncResponse.resume(response), e -> asyncResponse.resume("error"));

9. Disruptor

[Queue vs RingBuffer]

?

  • 在本例中,HTTP 線程將被阻塞,直到 disruptor 完成任務,并且使用?countdowlatch?將 HTTP 線程與?ExecutorService?中的線程同步。

  • 這個框架的主要特點是在沒有任何鎖的情況下處理線程間通信。在 ExecutorService 中,生產者和消費者之間的數據將通過?Queue傳遞,在生產者和消費者之間的數據傳輸過程中涉及到一個鎖。 Disruptor 框架通過一個名為 Ring Buffer 的數據結構(它是循環數組隊列的擴展版本)來處理這種生產者-消費者通信,并且不需要任何鎖。

  • 這個庫不適用于我們在這里討論的這種用例。僅出于好奇而添加。

9.1 何時使用?

Disruptor 框架在下列場合性能更好:與事件驅動的體系結構一起使用,或主要關注內存任務的單個生產者和多個消費者。

static {int userId = new Random().nextInt(10) + 1;// 示例 Event-Handler; count down latch 用于使線程與 http 線程同步EventHandler<Event> postsApiHandler = (event, sequence, endOfBatch) -> {event.posts = JsonService.getPosts();event.countDownLatch.countDown();};// 配置 Disputor 用于處理事件DISRUPTOR.handleEventsWith(postsApiHandler, commentsApiHandler, albumsApiHandler).handleEventsWithWorkerPool(photosApiHandler1, photosApiHandler2).thenHandleEventsWithWorkerPool(postsAndCommentsResponseHandler1, postsAndCommentsResponseHandler2).handleEventsWithWorkerPool(albumsAndPhotosResponseHandler1, albumsAndPhotosResponseHandler2);DISRUPTOR.start(); }// 對于每個請求,在 RingBuffer 中發布一個事件: Event event = null; RingBuffer<Event> ringBuffer = DISRUPTOR.getRingBuffer(); long sequence = ringBuffer.next(); CountDownLatch countDownLatch = new CountDownLatch(6); try {event = ringBuffer.get(sequence);event.countDownLatch = countDownLatch;event.startTime = System.currentTimeMillis(); } finally {ringBuffer.publish(sequence); } try {event.countDownLatch.await(); } catch (InterruptedException e) {e.printStackTrace(); }

10. Akka

  • Akka 庫的主要優勢在于它擁有構建分布式系統的本地支持。

  • 它運行在一個叫做 Actor System 的系統上。這個系統抽象了線程的概念,Actor System 中的 Actor 通過異步消息進行通信,這類似于生產者和消費者之間的通信。

  • 這種額外的抽象級別有助于 Actor System 提供諸如容錯、位置透明等特性。

  • 使用正確的 Actor-to-Thread 策略,可以對該框架進行優化,使其性能優于上表所示的結果。 雖然它不能在單個節點上與傳統方法的性能匹敵,但是由于其構建分布式和彈性系統的能力,仍然是首選。

10.1 示例代碼

// 來自 controller : Actors.masterActor.tell(new Master.Request("Get Response", event, Actors.workerActor), ActorRef.noSender());// handler : public Receive createReceive() {return receiveBuilder().match(Request.class, request -> {Event event = request.event; // Ideally, immutable data structures should be used here.request.worker.tell(new JsonServiceWorker.Request("posts", event), getSelf());request.worker.tell(new JsonServiceWorker.Request("comments", event), getSelf());request.worker.tell(new JsonServiceWorker.Request("albums", event), getSelf());request.worker.tell(new JsonServiceWorker.Request("photos", event), getSelf());}).match(Event.class, e -> {if (e.posts != null && e.comments != null & e.albums != null & e.photos != null) {int userId = new Random().nextInt(10) + 1;String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, e.posts,e.comments);String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, e.albums,e.photos);String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;e.response = response;e.countDownLatch.countDown();}}).build(); }

11. 總結

  • 根據機器的負載決定 Executor 框架的配置,并檢查是否可以根據應用程序中并行任務的數量進行負載平衡。

  • 對于大多數傳統應用程序來說,使用響應式開發庫或任何異步庫都會降低性能。只有當用例類似于服務器端聊天應用程序時,這個模式才有用,其中線程在客戶機響應之前不需要保留連接。

  • Disruptor 框架在與事件驅動的架構模式一起使用時性能很好; 但是當 Disruptor 模式與傳統架構混合使用時,就我們在這里討論的用例而言,它并不符合標準。 這里需要注意的是,Akka 和 Disruptor 庫值得單獨寫一篇文章,介紹如何使用它們來實現事件驅動的架構模式。

  • 這篇文章的源代碼可以在?GitHub?上找到。

總結

以上是生活随笔為你收集整理的Java 并发框架全览,这个牛逼!的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。