来,带你鸟瞰 Java 中的并发框架!
來(lái)自 ImportNew,作者:唐尤華
https://dzone.com/articles/a-birds-eye-view-on-java-concurrency-frameworks-1
1. 為什么要寫這篇文章
幾年前 NoSQL 開(kāi)始流行的時(shí)候,像其他團(tuán)隊(duì)一樣,我們的團(tuán)隊(duì)也熱衷于令人興奮的新東西,并且計(jì)劃替換一個(gè)應(yīng)用程序的數(shù)據(jù)庫(kù)。 但是,當(dāng)深入實(shí)現(xiàn)細(xì)節(jié)時(shí),我們想起了一位智者曾經(jīng)說(shuō)過(guò)的話:“細(xì)節(jié)決定成敗”。最終我們意識(shí)到 NoSQL 不是解決所有問(wèn)題的銀彈,而 NoSQL vs RDMS 的答案是:“視情況而定”。?
類似地,去年RxJava 和 Spring Reactor 這樣的并發(fā)庫(kù)加入了讓人充滿激情的語(yǔ)句,如異步非阻塞方法等。為了避免再犯同樣的錯(cuò)誤,我們嘗試評(píng)估諸如 ExecutorService、 RxJava、Disruptor 和 Akka 這些并發(fā)框架彼此之間的差異,以及如何確定各自框架的正確用法。
本文中用到的術(shù)語(yǔ)在這里有更詳細(xì)的描述。
?
2. 分析并發(fā)框架的示例用例
?
3. 快速更新線程配置
在開(kāi)始比較并發(fā)框架的之前,讓我們快速?gòu)?fù)習(xí)一下如何配置最佳線程數(shù)以提高并行任務(wù)的性能。 這個(gè)理論適用于所有框架,并且在所有框架中使用相同的線程配置來(lái)度量性能。
-
對(duì)于內(nèi)存任務(wù),線程的數(shù)量大約等于具有最佳性能的內(nèi)核的數(shù)量,盡管它可以根據(jù)各自處理器中的超線程特性進(jìn)行一些更改。
-
例如,在8核機(jī)器中,如果對(duì)應(yīng)用程序的每個(gè)請(qǐng)求都必須在內(nèi)存中并行執(zhí)行4個(gè)任務(wù),那么這臺(tái)機(jī)器上的負(fù)載應(yīng)該保持為?@2 req/sec,在?ThreadPool?中保持8個(gè)線程。
-
-
對(duì)于 I/O 任務(wù),ExecutorService?中配置的線程數(shù)應(yīng)該取決于外部服務(wù)的延遲。
-
與內(nèi)存中的任務(wù)不同,I/O 任務(wù)中涉及的線程將被阻塞,并處于等待狀態(tài),直到外部服務(wù)響應(yīng)或超時(shí)。 因此,當(dāng)涉及 I/O 任務(wù)線程被阻塞時(shí),應(yīng)該增加線程的數(shù)量,以處理來(lái)自并發(fā)請(qǐng)求的額外負(fù)載。
-
I/O 任務(wù)的線程數(shù)應(yīng)該以保守的方式增加,因?yàn)樘幱诨顒?dòng)狀態(tài)的許多線程帶來(lái)了上下文切換的成本,這將影響應(yīng)用程序的性能。 為了避免這種情況,應(yīng)該根據(jù) I/O 任務(wù)中涉及的線程的等待時(shí)間按比例增加此機(jī)器的線程的確切數(shù)量以及負(fù)載。
-
參考:?http://baddotrobot.com/blog/2013/06/01/optimum-number-of-threads/
?
4. 性能測(cè)試結(jié)果
性能測(cè)試配置 GCP -> 處理器:Intel(R) Xeon(R) CPU @ 2.30GHz;架構(gòu):x86_64;CPU 內(nèi)核:8個(gè)(注意: 這些結(jié)果僅對(duì)該配置有意義,并不表示一個(gè)框架比另一個(gè)框架更好)。
?
5. 使用執(zhí)行器服務(wù)并行化 IO 任務(wù)
5.1 何時(shí)使用?
如果一個(gè)應(yīng)用程序部署在多個(gè)節(jié)點(diǎn)上,并且每個(gè)節(jié)點(diǎn)的 req/sec 小于可用的核心數(shù)量,那么?ExecutorService?可用于并行化任務(wù),更快地執(zhí)行代碼。
5.2 什么時(shí)候適用?
如果一個(gè)應(yīng)用程序部署在多個(gè)節(jié)點(diǎn)上,并且每個(gè)節(jié)點(diǎn)的 req/sec 遠(yuǎn)遠(yuǎn)高于可用的核心數(shù)量,那么使用?ExecutorService?進(jìn)一步并行化只會(huì)使情況變得更糟。
當(dāng)外部服務(wù)延遲增加到 400ms 時(shí),性能測(cè)試結(jié)果如下(請(qǐng)求速率 @50 req/sec,8核)。
5.3 所有任務(wù)按順序執(zhí)行示例
// I/O 任務(wù):調(diào)用外部服務(wù) String posts = JsonService.getPosts(); String comments = JsonService.getComments(); String albums = JsonService.getAlbums(); String photos = JsonService.getPhotos();// 合并來(lái)自外部服務(wù)的響應(yīng) // (內(nèi)存中的任務(wù)將作為此操作的一部分執(zhí)行) int userId = new Random().nextInt(10) + 1; String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments); String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);// 構(gòu)建最終響應(yīng)并將其發(fā)送回客戶端 String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser; return response;5.4 I/O 任務(wù)與 ExecutorService 并行執(zhí)行代碼示例
// 添加 I/O 任務(wù) List<Callable<String>> ioCallableTasks = new ArrayList<>(); ioCallableTasks.add(JsonService::getPosts); ioCallableTasks.add(JsonService::getComments); ioCallableTasks.add(JsonService::getAlbums); ioCallableTasks.add(JsonService::getPhotos);// 調(diào)用所有并行任務(wù) ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize); List<Future<String>> futuresOfIOTasks = ioExecutorService.invokeAll(ioCallableTasks);// 獲取 I/O ?操作(阻塞調(diào)用)結(jié)果 String posts = futuresOfIOTasks.get(0).get(); String comments = futuresOfIOTasks.get(1).get(); String albums = futuresOfIOTasks.get(2).get(); String photos = futuresOfIOTasks.get(3).get();// 合并響應(yīng)(內(nèi)存中的任務(wù)是此操作的一部分) String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments); String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);// 構(gòu)建最終響應(yīng)并將其發(fā)送回客戶端 return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;?
6. 使用執(zhí)行器服務(wù)并行化 IO 任務(wù)(CompletableFuture)
與上述情況類似:處理傳入請(qǐng)求的 HTTP 線程被阻塞,而 CompletableFuture 用于處理并行任務(wù)
6.1 何時(shí)使用?
如果沒(méi)有?AsyncResponse,性能與?ExecutorService?相同。 如果多個(gè) API 調(diào)用必須異步并且鏈接起來(lái),那么這種方法更好(類似 Node 中的 Promises)。
ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);// I/O 任務(wù) CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService); CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,ioExecutorService); CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,ioExecutorService); CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,ioExecutorService); CompletableFuture.allOf(postsFuture, commentsFuture, albumsFuture, photosFuture).get();// 從 I/O 任務(wù)(阻塞調(diào)用)獲得響應(yīng) String posts = postsFuture.get(); String comments = commentsFuture.get(); String albums = albumsFuture.get(); String photos = photosFuture.get();// 合并響應(yīng)(內(nèi)存中的任務(wù)將是此操作的一部分) String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments); String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);// 構(gòu)建最終響應(yīng)并將其發(fā)送回客戶端 return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;?
7. 使用 ExecutorService 并行處理所有任務(wù)
使用?ExecutorService?并行處理所有任務(wù),并使用?@suspended AsyncResponse response?以非阻塞方式發(fā)送響應(yīng)。
圖片來(lái)自?http://tutorials.jenkov.com/java-nio/nio-vs-io.html-
HTTP 線程處理傳入請(qǐng)求的連接,并將處理傳遞給 Executor Pool,當(dāng)所有任務(wù)完成后,另一個(gè) HTTP 線程將把響應(yīng)發(fā)送回客戶端(異步非阻塞)。
-
性能下降原因:
-
在同步通信中,盡管 I/O 任務(wù)中涉及的線程被阻塞,但是只要進(jìn)程有額外的線程來(lái)承擔(dān)并發(fā)請(qǐng)求負(fù)載,它仍然處于運(yùn)行狀態(tài)。
-
因此,以非阻塞方式保持線程所帶來(lái)的好處非常少,而且在此模式中處理請(qǐng)求所涉及的成本似乎很高。
-
通常,對(duì)這里討論采用的例子使用異步非阻塞方法會(huì)降低應(yīng)用程序的性能。
-
7.1 何時(shí)使用?
如果用例類似于服務(wù)器端聊天應(yīng)用程序,在客戶端響應(yīng)之前,線程不需要保持連接,那么異步、非阻塞方法比同步通信更受歡迎。在這些用例中,系統(tǒng)資源可以通過(guò)異步、非阻塞方法得到更好的利用,而不僅僅是等待。
// 為異步執(zhí)行提交并行任務(wù) ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize); CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService); CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments, ioExecutorService); CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums, ioExecutorService); CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos, ioExecutorService);// 當(dāng) /posts API 返回響應(yīng)時(shí),它將與來(lái)自 /comments API 的響應(yīng)結(jié)合在一起 // 作為這個(gè)操作的一部分,將執(zhí)行內(nèi)存中的一些任務(wù) CompletableFuture<String> postsAndCommentsFuture = postsFuture.thenCombineAsync(commentsFuture, (posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments), ioExecutorService);// 當(dāng) /albums API 返回響應(yīng)時(shí),它將與來(lái)自 /photos API 的響應(yīng)結(jié)合在一起 // 作為這個(gè)操作的一部分,將執(zhí)行內(nèi)存中的一些任務(wù) CompletableFuture<String> albumsAndPhotosFuture = albumsFuture.thenCombineAsync(photosFuture, (albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos), ioExecutorService);// 構(gòu)建最終響應(yīng)并恢復(fù) http 連接,把響應(yīng)發(fā)送回客戶端 postsAndCommentsFuture.thenAcceptBothAsync(albumsAndPhotosFuture, (s1, s2) -> { LOG.info("Building Async Response in Thread " + Thread.currentThread().getName()); String response = s1 + s2; asyncHttpResponse.resume(response); }, ioExecutorService);?
8. RxJava
-
這與上面的情況類似,唯一的區(qū)別是 RxJava 提供了更好的 DSL 可以進(jìn)行流式編程,下面的例子中沒(méi)有體現(xiàn)這一點(diǎn)。
-
性能優(yōu)于?CompletableFuture?處理并行任務(wù)。
8.1 何時(shí)使用?
如果編碼的場(chǎng)景適合異步非阻塞方式,那么可以首選 RxJava 或任何響應(yīng)式開(kāi)發(fā)庫(kù)。 還具有諸如 back-pressure 之類的附加功能,可以在生產(chǎn)者和消費(fèi)者之間平衡負(fù)載。
int userId = new Random().nextInt(10) + 1; ExecutorService executor = CustomThreads.getExecutorService(8);// I/O 任務(wù) Observable<String> postsObservable = Observable.just(userId).map(o -> JsonService.getPosts()) .subscribeOn(Schedulers.from(executor)); Observable<String> commentsObservable = Observable.just(userId).map(o -> JsonService.getComments()) .subscribeOn(Schedulers.from(executor)); Observable<String> albumsObservable = Observable.just(userId).map(o -> JsonService.getAlbums()) .subscribeOn(Schedulers.from(executor)); Observable<String> photosObservable = Observable.just(userId).map(o -> JsonService.getPhotos()) .subscribeOn(Schedulers.from(executor));// 合并來(lái)自 /posts 和 /comments API 的響應(yīng) // 作為這個(gè)操作的一部分,將執(zhí)行內(nèi)存中的一些任務(wù) Observable<String> postsAndCommentsObservable = Observable .zip(postsObservable, commentsObservable, (posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments)) .subscribeOn(Schedulers.from(executor));// 合并來(lái)自 /albums 和 /photos API 的響應(yīng) // 作為這個(gè)操作的一部分,將執(zhí)行內(nèi)存中的一些任務(wù) Observable<String> albumsAndPhotosObservable = Observable .zip(albumsObservable, photosObservable, (albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos)) .subscribeOn(Schedulers.from(executor));// 構(gòu)建最終響應(yīng) Observable.zip(postsAndCommentsObservable, albumsAndPhotosObservable, (r1, r2) -> r1 + r2) .subscribeOn(Schedulers.from(executor)) .subscribe((response) -> asyncResponse.resume(response), e -> asyncResponse.resume("error"));?
9. Disruptor
[Queue vs RingBuffer]
圖片1:?http://tutorials.jenkov.com/java-concurrency/blocking-queues.html
圖片2:?https://www.baeldung.com/lmax-disruptor-concurrency
-
在本例中,HTTP 線程將被阻塞,直到 disruptor 完成任務(wù),并且使用?countdowlatch?將 HTTP 線程與?ExecutorService?中的線程同步。
-
這個(gè)框架的主要特點(diǎn)是在沒(méi)有任何鎖的情況下處理線程間通信。在 ExecutorService 中,生產(chǎn)者和消費(fèi)者之間的數(shù)據(jù)將通過(guò)?Queue傳遞,在生產(chǎn)者和消費(fèi)者之間的數(shù)據(jù)傳輸過(guò)程中涉及到一個(gè)鎖。 Disruptor 框架通過(guò)一個(gè)名為 Ring Buffer 的數(shù)據(jù)結(jié)構(gòu)(它是循環(huán)數(shù)組隊(duì)列的擴(kuò)展版本)來(lái)處理這種生產(chǎn)者-消費(fèi)者通信,并且不需要任何鎖。
-
這個(gè)庫(kù)不適用于我們?cè)谶@里討論的這種用例。僅出于好奇而添加。
9.1 何時(shí)使用?
Disruptor 框架在下列場(chǎng)合性能更好:與事件驅(qū)動(dòng)的體系結(jié)構(gòu)一起使用,或主要關(guān)注內(nèi)存任務(wù)的單個(gè)生產(chǎn)者和多個(gè)消費(fèi)者。
static {int userId = new Random().nextInt(10) + 1;// 示例 Event-Handler; count down latch 用于使線程與 http 線程同步EventHandler<Event> postsApiHandler = (event, sequence, endOfBatch) -> {event.posts = JsonService.getPosts();event.countDownLatch.countDown();};// 配置 Disputor 用于處理事件DISRUPTOR.handleEventsWith(postsApiHandler, commentsApiHandler, albumsApiHandler).handleEventsWithWorkerPool(photosApiHandler1, photosApiHandler2).thenHandleEventsWithWorkerPool(postsAndCommentsResponseHandler1, postsAndCommentsResponseHandler2).handleEventsWithWorkerPool(albumsAndPhotosResponseHandler1, albumsAndPhotosResponseHandler2);DISRUPTOR.start(); }// 對(duì)于每個(gè)請(qǐng)求,在 RingBuffer 中發(fā)布一個(gè)事件: Event event = null; RingBuffer<Event> ringBuffer = DISRUPTOR.getRingBuffer(); long sequence = ringBuffer.next(); CountDownLatch countDownLatch = new CountDownLatch(6); try {event = ringBuffer.get(sequence);event.countDownLatch = countDownLatch;event.startTime = System.currentTimeMillis(); } finally {ringBuffer.publish(sequence); } try {event.countDownLatch.await(); } catch (InterruptedException e) {e.printStackTrace(); }?
10. Akka
圖片來(lái)自:?https://blog.codecentric.de/en/2015/08/introduction-to-akka-actors/
-
Akka 庫(kù)的主要優(yōu)勢(shì)在于它擁有構(gòu)建分布式系統(tǒng)的本地支持。
-
它運(yùn)行在一個(gè)叫做 Actor System 的系統(tǒng)上。這個(gè)系統(tǒng)抽象了線程的概念,Actor System 中的 Actor 通過(guò)異步消息進(jìn)行通信,這類似于生產(chǎn)者和消費(fèi)者之間的通信。
-
這種額外的抽象級(jí)別有助于 Actor System 提供諸如容錯(cuò)、位置透明等特性。
-
使用正確的 Actor-to-Thread 策略,可以對(duì)該框架進(jìn)行優(yōu)化,使其性能優(yōu)于上表所示的結(jié)果。 雖然它不能在單個(gè)節(jié)點(diǎn)上與傳統(tǒng)方法的性能匹敵,但是由于其構(gòu)建分布式和彈性系統(tǒng)的能力,仍然是首選。
10.1 示例代碼
// 來(lái)自 controller : Actors.masterActor.tell(new Master.Request("Get Response", event, Actors.workerActor), ActorRef.noSender());// handler : public Receive createReceive() {return receiveBuilder().match(Request.class, request -> {Event event = request.event; // Ideally, immutable data structures should be used here.request.worker.tell(new JsonServiceWorker.Request("posts", event), getSelf());request.worker.tell(new JsonServiceWorker.Request("comments", event), getSelf());request.worker.tell(new JsonServiceWorker.Request("albums", event), getSelf());request.worker.tell(new JsonServiceWorker.Request("photos", event), getSelf());}).match(Event.class, e -> {if (e.posts != null && e.comments != null & e.albums != null & e.photos != null) {int userId = new Random().nextInt(10) + 1;String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, e.posts,e.comments);String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, e.albums,e.photos);String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;e.response = response;e.countDownLatch.countDown();}}).build(); }?
11. 總結(jié)
-
根據(jù)機(jī)器的負(fù)載決定 Executor 框架的配置,并檢查是否可以根據(jù)應(yīng)用程序中并行任務(wù)的數(shù)量進(jìn)行負(fù)載平衡。
-
對(duì)于大多數(shù)傳統(tǒng)應(yīng)用程序來(lái)說(shuō),使用響應(yīng)式開(kāi)發(fā)庫(kù)或任何異步庫(kù)都會(huì)降低性能。只有當(dāng)用例類似于服務(wù)器端聊天應(yīng)用程序時(shí),這個(gè)模式才有用,其中線程在客戶機(jī)響應(yīng)之前不需要保留連接。
-
Disruptor 框架在與事件驅(qū)動(dòng)的架構(gòu)模式一起使用時(shí)性能很好; 但是當(dāng) Disruptor 模式與傳統(tǒng)架構(gòu)混合使用時(shí),就我們?cè)谶@里討論的用例而言,它并不符合標(biāo)準(zhǔn)。 這里需要注意的是,Akka 和 Disruptor 庫(kù)值得單獨(dú)寫一篇文章,介紹如何使用它們來(lái)實(shí)現(xiàn)事件驅(qū)動(dòng)的架構(gòu)模式。
-
這篇文章的源代碼可以在?GitHub?上找到。
總結(jié)
以上是生活随笔為你收集整理的来,带你鸟瞰 Java 中的并发框架!的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Kubernetes 2018 年度简史
- 下一篇: Java程序员进阶的 3 个层次,你处于