日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

使用ThreadPoolExecutor并行化独立的单线程任务

發(fā)布時間:2023/12/3 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用ThreadPoolExecutor并行化独立的单线程任务 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
Java SE 5.0中引入的任務(wù)執(zhí)行框架是簡化多線程應(yīng)用程序的設(shè)計和開發(fā)的巨大飛躍。 該框架提供了用于管理任務(wù)概念,管理線程生命周期及其執(zhí)行策略的工具。

在此博客文章中,我們將描述該框架的功能,靈活性和簡單性,以展示一個簡單的用例。

基礎(chǔ)

執(zhí)行程序框架引入了一個接口來管理任務(wù)執(zhí)行: 執(zhí)行程序。 Executor是用于提交任務(wù)的接口,表示為Runnable實例。 此接口還將任務(wù)提交與任務(wù)執(zhí)行隔離開來 :具有不同執(zhí)行策略的執(zhí)行者都發(fā)布相同的提交接口:如果您更改執(zhí)行策略,則提交邏輯將不受更改的影響。

如果您想提交一個Runnable實例來執(zhí)行,它很簡單:

Executor exec = …; exec.execute(runnable);

線程池

如上一節(jié)所述,執(zhí)行器合同未指定執(zhí)行器如何執(zhí)行可運行對象:這取決于您所使用的執(zhí)行器的特定類型。 該框架提供了一些不同類型的執(zhí)行器,每種執(zhí)行器都有針對不同用例量身定制的特定執(zhí)行策略。

您將要處理的最常見的執(zhí)行程序類型是線程池執(zhí)行程序 。,它們是ThreadPoolExecutor類(及其子類)的實例。 線程池執(zhí)行程序管理一個線程池 (即將要執(zhí)行任務(wù)的工作線程池)和一個工作隊列 。

您肯定已經(jīng)在其他技術(shù)中看到池的概念。 使用池的主要優(yōu)點是減少了資源創(chuàng)建的開銷,重用了使用后釋放的結(jié)構(gòu)(在這種情況下為線程)。 使用池的另一個隱式優(yōu)勢是可以調(diào)整資源使用量 :可以調(diào)整線程池大小以實現(xiàn)所需的負(fù)載,而不會損害系統(tǒng)資源。

該框架為線程池提供了一個工廠類,稱為Executors 。 使用該工廠,您將能夠創(chuàng)建具有不同特征的線程池。 通常,底層實現(xiàn)通常是相同的( ThreadPoolExecutor ),但是工廠類可幫助您快速配置線程池,而無需使用更復(fù)雜的構(gòu)造函數(shù)。 出廠方法是:

  • newFixedThreadPool :此方法返回最大大小固定的線程池。 它將根據(jù)需要創(chuàng)建新線程,直到最大配置大小。 當(dāng)線程數(shù)達(dá)到最大值時,線程池將保持大小不變??。
  • newCachedThreadPool :此方法返回?zé)o限制的線程池,即沒有最大大小的線程池。 但是,當(dāng)負(fù)載減少時,這種線程池將拆除未使用的線程。
  • newSingleThreadedExecutor :此方法返回一個執(zhí)行程序,該執(zhí)行程序保證將在單個線程中執(zhí)行任務(wù)。
  • newScheduledThreadPool :此方法返回固定大小的線程池,該線程池支持延遲和定時任務(wù)執(zhí)行。

這僅僅是個開始。 執(zhí)行器還提供了本教程中未涵蓋的其他功能,我強烈建議您學(xué)習(xí)以下內(nèi)容:

  • 生命周期管理方法,由ExecutorService接口聲明(例如shutdown ()和awaitTermination ())。
  • 完成服務(wù)可輪詢?nèi)蝿?wù)狀態(tài)并檢索其返回值(如果適用)。

該ExecutorService的接口就顯得尤為重要,因為它提供了一種方法來關(guān)閉一個線程池,這是一件好事,你幾乎肯定希望能夠干凈利落做。 幸運的是, ExecutorService接口非常簡單且易于解釋,我建議您徹底研究其JavaDoc。

基本上,您會向ExecutorService發(fā)送shutdown ()消息,此后它將不接受新提交的任務(wù),但將繼續(xù)處理已排隊的作業(yè)。 您可以使用isTerminated ()來收集執(zhí)行程序服務(wù)的終止?fàn)顟B(tài),也可以使用awaitTermination (…)方法等待終止。 不過, awaitTermination方法不會永遠(yuǎn)等待:您必須將最大等待超時作為參數(shù)傳遞。

警告 :錯誤和混亂的根源是理解為什么JVM進(jìn)程永不退出的原因。 如果不關(guān)閉執(zhí)行程序服務(wù),從而破壞基礎(chǔ)線程,則JVM將永遠(yuǎn)不會退出: JVM在其最后一個非守護(hù)線程退出時退出。

配置ThreadPoolExecutor

如果決定手動創(chuàng)建ThreadPoolExecutor而不是使用Executors工廠類,則需要使用其構(gòu)造函數(shù)之一來創(chuàng)建和配置ThreadPoolExecutor 。 此類的最廣泛的構(gòu)造方法是:

public ThreadPoolExecutor( int corePoolSize, int maxPoolSize, long keepAlive, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler);

如您所見,您可以配置:

  • 核心池大小(線程池將嘗試使用的大小)。
  • 最大池大小。
  • 保持活動時間,在該時間之后,空閑線程有資格被拆除。
  • 工作隊列中包含等待執(zhí)行的任務(wù)。
  • 拒絕任務(wù)提交時應(yīng)用的策略。

限制排隊的任務(wù)數(shù)

在可預(yù)測性和穩(wěn)定性方面,限制正在執(zhí)行的并發(fā)任務(wù)的數(shù)量,調(diào)整線程池的大小對您的應(yīng)用程序及其執(zhí)行環(huán)境具有巨大的好處:無限制的線程創(chuàng)建最終將耗盡運行時資源,結(jié)果您的應(yīng)用程序可能會遇到嚴(yán)重的性能問題,甚至可能導(dǎo)致應(yīng)用程序不穩(wěn)定。

這只是解決部分問題的一種解決方案:您限制了正在執(zhí)行的任務(wù)數(shù)量,但沒有限制可以提交并排隊供以后執(zhí)行的作業(yè)數(shù)量。 該應(yīng)用程序?qū)⒃谝院笥龅劫Y源短缺的問題,但是如果提交率始終超過執(zhí)行率,它將最終遇到這種情況。

該問題的解決方案是:

  • 向執(zhí)行者提供阻塞隊列以保留等待的任務(wù)。 如果隊列已滿,提交的任務(wù)將被“拒絕”。
  • 當(dāng)拒絕任務(wù)提交時,將調(diào)用RejectedExecutionHandler ,這就是為什么在上一項中引用了被拒絕的動詞的原因。 您可以實施自己的拒絕策略,也可以使用框架提供的內(nèi)置策略之一。

默認(rèn)拒絕策略使執(zhí)行程序拋出RejectedExecutionException 。 但是,其他內(nèi)置策略可讓您:

  • 靜默丟棄作業(yè)。
  • 丟棄最舊的作業(yè),然后嘗試重新提交最后一份。
  • 在調(diào)用者的線程上執(zhí)行被拒絕的任務(wù)。

什么時候以及為什么要使用這樣的線程池配置? 讓我們來看一個例子。

一個示例:并行化獨立的單線程任務(wù)

最近,有人打電話給我解決我的客戶自很久以前就在運行的一項舊工作的問題。 基本上,作業(yè)由等待一組目錄層次結(jié)構(gòu)上的文件系統(tǒng)事件的組件組成。 每當(dāng)觸發(fā)事件時,都必須處理文件。 文件處理由專有的單線程進(jìn)程執(zhí)行。 說實話,就其本身的性質(zhì)而言,即使我可以,但如果我可以并行化它,我就不會。 事件全天的到達(dá)率很高,不需要實時處理文件,而只需要在第二天之前進(jìn)行處理即可。

當(dāng)前的實現(xiàn)是技術(shù)的混合與匹配,包括UNIX shell腳本,該腳本負(fù)責(zé)掃描巨大的目錄層次結(jié)構(gòu)以檢測應(yīng)用更改的位置。 實施該實現(xiàn)后,執(zhí)行環(huán)境中的核心數(shù)量也就只有兩個。 同樣,事件的發(fā)生率也很低:如今,它們的數(shù)量級約為數(shù)百萬 ,總共要處理1到2 TB的原始數(shù)據(jù)。

如今,客戶端正在運行這些進(jìn)程的服務(wù)器是十二臺核心計算機:這是并行化那些舊的單線程任務(wù)的巨大機會。 我們已經(jīng)基本掌握了配方的所有成分,我們只需要決定如何構(gòu)建和調(diào)整它即可。 在編寫任何代碼之前,需要進(jìn)行一些思考以了解負(fù)載的性質(zhì),這些是我檢測到的約束:

  • 定期要掃描大量文件:每個目錄包含一到兩百萬個文件。
  • 掃描算法非常快,可以并行化。
  • 處理文件至少需要1秒,甚至可能需要2或3秒的峰值。
  • 處理文件時,除CPU外沒有其他瓶頸。
  • CPU使用率必須是可調(diào)的,以便根據(jù)一天中的時間使用不同的負(fù)載配置文件。

因此,我需要一個線程池,該線程池的大小由調(diào)用流程時活動的負(fù)載配置文件確定。 然后,我傾向于創(chuàng)建根據(jù)負(fù)載策略配置的固定大小的線程池執(zhí)行程序。 由于處理線程僅受CPU限制,其核心使用率為100%,并且無需等待其他資源,因此負(fù)載策略非常容易計算:只需獲取處理環(huán)境中可用的核心數(shù)量,然后使用負(fù)載按比例縮小當(dāng)時處于活動狀態(tài)的因素(并檢查在峰值時刻至少使用了一個內(nèi)核):

int cpus = Runtime.getRuntime().availableProcessors(); int maxThreads = cpus * scaleFactor; maxThreads = (maxThreads > 0 ? maxThreads : 1);

然后,我需要使用阻塞隊列來創(chuàng)建ThreadPoolExecutor來限制提交的任務(wù)數(shù)。 為什么? 好吧:目錄掃描算法非常快,并且會生成大量文件以非常快速地處理。 有多大? 很難預(yù)測,其可變性很高。 我不會讓執(zhí)行者的內(nèi)部隊列亂七八糟地用代表我的任務(wù)的對象(包括一個非常大的文件描述符)填充。 我寧愿讓執(zhí)行程序在隊列填滿時拒絕文件。

另外,我將使用ThreadPoolExecutor.CallerRunsPolicy作為拒絕策略。 為什么? 好吧,因為當(dāng)隊列已滿并且池中的線程正在忙于處理文件時,我將擁有正在提交執(zhí)行該文件的任務(wù)的線程。 這樣,掃描將停止處理文件,并在完成當(dāng)前任務(wù)后立即恢復(fù)掃描。

這是創(chuàng)建執(zhí)行程序的代碼:

ExecutorService executorService =new ThreadPoolExecutor(maxThreads, // core thread pool sizemaxThreads, // maximum thread pool size1, // time to wait before resizing poolTimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(maxThreads, true),new ThreadPoolExecutor.CallerRunsPolicy());

代碼的框架如下(已大大簡化):

// scanning loop: fake scanning while (!dirsToProcess.isEmpty()) {File currentDir = dirsToProcess.pop();// listing childrenFile[] children = currentDir.listFiles();// processing childrenfor (final File currentFile : children) {// if it's a directory, defer processingif (currentFile.isDirectory()) {dirsToProcess.add(currentFile);continue;}executorService.submit(new Runnable() {@Overridepublic void run() {try {// if it's a file, process itnew ConvertTask(currentFile).perform();} catch (Exception ex) {// error management logic}}}); }// ...// wait for all of the executor threads to finish executorService.shutdown();try {if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {// pool didn't terminate after the first tryexecutorService.shutdownNow();}if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {// pool didn't terminate after the second try} } catch (InterruptedException ex) {executorService.shutdownNow();Thread.currentThread().interrupt(); }

結(jié)論

如您所見,Java并發(fā)API非常易于使用,非常靈活并且功能強大。 幾年前,我會花更多的精力編寫這樣一個簡單的程序。 這樣,我可以在幾個小時內(nèi)快速解決由遺留的單線程組件引起的可伸縮性問題。

參考: The Gray Blog中的 JCG合作伙伴 Enrico Crisostomo 使用ThreadPoolExecutor并行化獨立的單線程任務(wù) 。

相關(guān)片段:
  • 受限連接池的阻塞隊列示例
  • 更一般的等待/通知機制的CountDownLatch示例
  • 任務(wù)運行器的重入鎖示例
  • 限制URL連接的信號量示例

相關(guān)文章 :

  • Java并發(fā)教程–線程池
  • 有益的CountDownLatch和棘手的Java死鎖
  • 并發(fā)優(yōu)化–減少鎖粒度
  • Java并發(fā)教程– CountDownLatch
  • JVM如何處理鎖
  • Java教程和Android教程列表

翻譯自: https://www.javacodegeeks.com/2011/12/using-threadpoolexecutor-to-parallelize.html

創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎

總結(jié)

以上是生活随笔為你收集整理的使用ThreadPoolExecutor并行化独立的单线程任务的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。