日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

线程池学习笔记

發布時間:2023/12/20 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 线程池学习笔记 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

線程

Callable

public class CallableDemo {public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();Future<String> future = executorService.submit(() -> {log.info("do something in callable");Thread.sleep(5000);return "Done";});log.info("do something in main");Thread.sleep(1000);String result = future.get();log.info("result:{}", result);} }

FutureTask

多線程執行任務時,有比較耗時操作,但又需要其返回結果時,可以使用FutureTask

public class FutureTaskDemo {public static void main(String[] args) throws Exception {FutureTask<String> futureTask = new FutureTask<String>(() -> {log.info("do something in callable");Thread.sleep(5000);return "Done";});new Thread(futureTask).start();log.info("do something in main");Thread.sleep(1000);// 獲取耗時操作的返回結果,這里是堵塞操作String result = futureTask.get();log.info("result:{}", result);} }

Fork/Join

用于并行執行任務,將大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。使用工作竊取算法,某個線程從其他隊列里竊取任務來執行

  • fork:將大任務切割成若干個子任務并行執行
  • join:合并子任務的執行結果得到大任務的結果

局限性

  • 不能執行I/O操作(讀寫數據文件)
  • 不能拋出檢查異常,必須通過必要的代碼來處理他們
  • 任務只能使用fork和join操作來作為同步機制,如果使用其他同步機制,那么執行任務時,工作線程就不能執行其他任務
  • import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask;/**** 繼承RecursiveTask,返回值為Integer類型* 覆寫computer方法*/ @Slf4j public class ForkJoinTaskDemo extends RecursiveTask<Integer> {// 閾值public static final int threshold = 2;private int start;private int end;public ForkJoinTaskDemo(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int retSum = 0;boolean canCompute = (end - start) <= threshold;if (canCompute) {// 如果任務足夠小就計算任務for (int i = start; i <= end; i++) {retSum += i;}} else {// 如果任務大于閾值,就不斷遞歸分裂成兩個子任務計算int middle = (start + end) / 2;ForkJoinTaskDemo leftTask = new ForkJoinTaskDemo(start, middle);ForkJoinTaskDemo rightTask = new ForkJoinTaskDemo(middle + 1, end);// 執行子任務leftTask.fork();rightTask.fork();// 等待任務執行結束合并其結果int leftResult = leftTask.join();int rightResult = rightTask.join();// 合并子任務retSum = leftResult + rightResult;}return retSum;}public static void main(String[] args) {ForkJoinPool forkjoinPool = new ForkJoinPool();// 生成一個計算任務,計算1+2+3+4ForkJoinTaskDemo task = new ForkJoinTaskDemo(1, 100);// 執行一個任務Future<Integer> result = forkjoinPool.submit(task);try {log.info("result:{}", result.get());} catch (Exception e) {log.error("exception", e);}} }

    歸并排序

    package test.thread.pool.merge;import java.util.Arrays; import java.util.Random;/*** 歸并排序* @author yinwenjie*/ public class Merge1 {private static int MAX = 10000;private static int inits[] = new int[MAX];// 這是為了生成一個數量為MAX的隨機整數集合,準備計算數據// 和算法本身并沒有什么關系static {Random r = new Random();for(int index = 1 ; index <= MAX ; index++) {inits[index - 1] = r.nextInt(10000000);}}public static void main(String[] args) {long beginTime = System.currentTimeMillis();int results[] = forkits(inits); long endTime = System.currentTimeMillis();// 如果參與排序的數據非常龐大,記得把這種打印方式去掉System.out.println("耗時=" + (endTime - beginTime) + " | " + Arrays.toString(results)); }// 拆分成較小的元素或者進行足夠小的元素集合的排序private static int[] forkits(int source[]) {int sourceLen = source.length;if(sourceLen > 2) {int midIndex = sourceLen / 2;int result1[] = forkits(Arrays.copyOf(source, midIndex));int result2[] = forkits(Arrays.copyOfRange(source, midIndex , sourceLen));// 將兩個有序的數組,合并成一個有序的數組int mer[] = joinInts(result1 , result2);return mer;} // 否則說明集合中只有一個或者兩個元素,可以進行這兩個元素的比較排序了else {// 如果條件成立,說明數組中只有一個元素,或者是數組中的元素都已經排列好位置了if(sourceLen == 1|| source[0] <= source[1]) {return source;} else {int targetp[] = new int[sourceLen];targetp[0] = source[1];targetp[1] = source[0];return targetp;}}}/*** 這個方法用于合并兩個有序集合* @param array1* @param array2*/private static int[] joinInts(int array1[] , int array2[]) {int destInts[] = new int[array1.length + array2.length];int array1Len = array1.length;int array2Len = array2.length;int destLen = destInts.length;// 只需要以新的集合destInts的長度為標準,遍歷一次即可for(int index = 0 , array1Index = 0 , array2Index = 0 ; index < destLen ; index++) {int value1 = array1Index >= array1Len?Integer.MAX_VALUE:array1[array1Index];int value2 = array2Index >= array2Len?Integer.MAX_VALUE:array2[array2Index];// 如果條件成立,說明應該取數組array1中的值if(value1 < value2) {array1Index++;destInts[index] = value1;}// 否則取數組array2中的值else {array2Index++;destInts[index] = value2;}}return destInts;} }

    歸并排序

    /*** 使用Fork/Join框架的歸并排序算法* @author yinwenjie*/ public class Merge2 {private static int MAX = 100000000;private static int inits[] = new int[MAX];// 同樣進行隨機隊列初始化,這里就不再贅述了static {......}public static void main(String[] args) throws Exception { // 正式開始long beginTime = System.currentTimeMillis();ForkJoinPool pool = new ForkJoinPool();MyTask task = new MyTask(inits);ForkJoinTask<int[]> taskResult = pool.submit(task);try {taskResult.get();} catch (InterruptedException | ExecutionException e) {e.printStackTrace(System.out);}long endTime = System.currentTimeMillis();System.out.println("耗時=" + (endTime - beginTime)); }/*** 單個排序的子任務* @author yinwenjie*/static class MyTask extends RecursiveTask<int[]> {private int source[];public MyTask(int source[]) {this.source = source;}/* (non-Javadoc)* @see java.util.concurrent.RecursiveTask#compute()*/@Overrideprotected int[] compute() {int sourceLen = source.length;// 如果條件成立,說明任務中要進行排序的集合還不夠小if(sourceLen > 2) {int midIndex = sourceLen / 2;// 拆分成兩個子任務MyTask task1 = new MyTask(Arrays.copyOf(source, midIndex));task1.fork();MyTask task2 = new MyTask(Arrays.copyOfRange(source, midIndex , sourceLen));task2.fork();// 將兩個有序的數組,合并成一個有序的數組int result1[] = task1.join();int result2[] = task2.join();int mer[] = joinInts(result1 , result2);return mer;} // 否則說明集合中只有一個或者兩個元素,可以進行這兩個元素的比較排序了else {// 如果條件成立,說明數組中只有一個元素,或者是數組中的元素都已經排列好位置了if(sourceLen == 1|| source[0] <= source[1]) {return source;} else {int targetp[] = new int[sourceLen];targetp[0] = source[1];targetp[1] = source[0];return targetp;}}}private int[] joinInts(int array1[] , int array2[]) {// 和上文中出現的代碼一致}} }

    BlockingQueue

    堵塞隊列,有兩種情況會堵塞

  • 隊列滿時,入隊線程會被堵塞
  • 隊列空時,出對線程會被堵塞
  • 操作Throws ExceptionSpecial ValueBlocksTimes Out
    添加add(o)offer(o)put(o)offer(0,timeout,timeunit)
    移除remove(o)poll()take()poll(timeout,timeunit)
    檢查element()peek()

    線程池

    ThreadPoolExecutor

    Executors

    使用場景

    想要頻繁的創建和銷毀線程的時候

    線程池的概念

    線程池就是提前創建若干個線程,如果有任務需要處理,線程池里的線程就會處理任務,處理完之后線程并不會被銷毀,而是等待下一個任務。由于創建和銷毀線程都是消耗系統資源的

    線程池的優勢

    • 降低創建線程和銷毀線程的性能開銷
    • 提高響應速度,當有新任務需要執行是不需要等待線程創建就可以立馬執行
    • 合理的設置線程池大小(限流)可以避免因為線程數超過硬件資源瓶頸帶來的問題

    Api Executors

    newFixedThreadPool

    該方法返回一個固定數量的線程池,當有一個任務提交時,若線程池中空閑,則立即執行,若沒有,則會被暫緩在一個任務隊列中,等待有空閑的線程去執行,用途:FixedThreadPool 用于負載比較大的服務器,為了資源的合理利用,需要限制當前線程數量。

    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

    newSingleThreadExecutor

    創建一個線程的線程池,若空閑則執行,若沒有空閑線程則暫緩在任務隊列中

    ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

    newCachedThreadPool

    根據實際情況調整線程個數,不限制最大線程數,若用空閑的線程則執行任務,若無任務則不創建線程。并且每一個空閑線程會在 60 秒后自動回收

    ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

    newScheduledThreadPool

    創建一個可以指定線程的數量的線程池,但是這個線程池還帶有延遲和周期性執行任務的功能,類似定時器

    ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);

    線程池參數

    ThreadPoolExecutor采取上述步驟的總體設計思路,是為了在執行execute()方法時,盡可能地避免獲取全局鎖(那將會是一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱之后 (當前運行的線程數大于等于corePoolSize),幾乎所有的execute()方法調用都是執行步驟2,而 步驟2不需要獲取全局鎖。

    線程池中線程總數、運行線程數、空閑線程數、任務隊列等之間的關系

  • 當【運行的線程數 < corePoolSize】,則直接創建新線程來處理任務,即使線程池中的其他線程是空閑的
  • 當【corePoolSize <= 線程池中線程數 < maximumPoolSize】,則只有當workQueue滿時,才創建新線程處理
  • 當【corePoolSize = maximumPoolSize】,在workQueue沒滿時,那么請求會放入workQueue,等待空閑線程去除任務處理
  • 當【運行的線程數 > maximumPoolSize】,如果workQueue已滿,那么會根據指定策略來處理提交過來的任務
  • ThreadPoolExecutor(int corePoolSize, //核心線程數int maximumPoolSize, //最大線程數long keepAliveTime, //超出核心線程數量以外的線程空余存活時間TimeUnit unit, //存活時間單位BlockingQueue<Runnable> workQueue, //保存執行任務的隊列ThreadFactory threadFactory, //創建新線程使用的工廠RejectedExecutionHandler handler) //當任務無法執行的時候的處理方式

    飽和策略

    RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處于飽和狀 態,那么必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法 處理新任務時拋出異常。在JDK 1.5中Java線程池框架提供了以下4種策略。

  • AbortPolicy:直接拋出異常。
  • CallerRunsPolicy:只用調用者所在線程來運行任務。
  • DiscardOldestPolicy:丟棄隊列里最近的一個任務,并執行當前任務。
  • DiscardPolicy:不處理,丟棄掉。
  • 當然,也可以根據應用場景需要來實現RejectedExecutionHandler接口自定義策略。如記錄日志或持久化存儲不能處理的任務。

    任務提交

  • execute();//任務提交
  • submit(); //帶有返回值的任務提交
  • 最佳線程數

    最佳線程數目 = (線程等待時間+任務執行時間)/任務執行時間 * CPU數目

    備注:這個公式也是前輩們分享的,當然之前看了淘寶前臺系統優化實踐的文章,和上面的公式很類似,不過在CPU數目那邊,他們更細化了,上面的公式只是參考。不過不管什么公式,最終還是在生產環境中運行后,再優化調整。

    例如服務器CPU核數為4核,一個任務線程cpu耗時為20ms,線程等待(網絡IO、磁盤IO)耗時80ms,那最佳線程數目:( 80 + 20 )/20 * 4 = 20。也就是設置20個線程數最佳。

    合理地配置線程池

    CPU密集型任務應配置盡可能小的線程,如配置Ncpu+1個線程的線程池。由于IO密集型任務線程并不是一直在執行任務,則應配置盡可能多的線程,如2*Ncpu。混合型的任務,如果可以拆分,將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那么分解后執行的吞吐量 將高于串行執行的吞吐量。如果這兩個任務執行時間相差太大,則沒必要進行分解。可以通過 Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數。

    依賴數據庫連接池的任務,因為線程提交SQL后需要等待數據庫返回結果,等待的時間越長,則CPU空閑時間就越長,那么線程數應該設置得越大,這樣才能更好地利用CPU。

    建議使用有界隊列。有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點 兒,比如幾千。有一次,我們系統里后臺任務線程池的隊列和線程池全滿了,不斷拋出拋棄任 務的異常,通過排查發現是數據庫出現了問題,導致執行SQL變得非常緩慢,因為后臺任務線 程池里的任務全是需要向數據庫查詢和插入數據的,所以導致線程池里的工作線程全部阻 塞,任務積壓在線程池里。如果當時我們設置成無界隊列,那么線程池的隊列就會越來越多, 有可能會撐滿內存,導致整個系統不可用,而不只是后臺任務出現問題。當然,我們的系統所 有的任務是用單獨的服務器部署的,我們使用不同規模的線程池完成不同類型的任務,但是 出現這樣問題時也會影響到其他任務。

    任務的性質

  • CPU密集型任務、
  • IO密集型任務和混合型任務
  • 多線程最佳實踐

  • 使用本變量地
  • 使用不可變類
  • 最小化鎖的作用域范圍:S=1/(1-a+a/n)
  • 使用線程池,而不是直接new Thread()
  • 使用同步也不要使用wait和notify
  • 使用BlockingQueue實現生產消費模式
  • 使用并發集合而不是加鎖的同步集合
  • 使用Semaphore創建有界訪問
  • 使用同步塊而不是同步方法
  • 避免使用靜態變量,否則用final等不可變類
  • 使用案例

    package com.insightfullogic.java8.concurrent;import java.util.ArrayList; import java.util.List; import java.util.concurrent.*;/*** @description: 線程測試類* @author: tiger* @create: 2022-10-07 11:33*/ public class MutilThread {// 建立一個線程池,注意要放在外面,不要每次執行代碼就建立一個,具體線程池的使用就不展開了public static ExecutorService commonThreadPool = new ThreadPoolExecutor(5, 5, 300L,TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) {// 開始多線程調用List<Future<String>> futures = new ArrayList<>();for (int i = 0; i < 12; i++) {int finalI = i;Future<String> future = (Future<String>) commonThreadPool.submit(() -> {System.out.println(finalI);});futures.add(future);}// 獲取結果List<String> list = new ArrayList<>();try {for (int i = 0; i < futures.size(); i++) {list.add(futures.get(i).get());}} catch (Exception e) { // LOGGER.error("出現錯誤:", e);}} }

    順序調用

    CompletableFuture<A> futureA = CompletableFuture.supplyAsync(() -> doA()); CompletableFuture<B> futureB = CompletableFuture.supplyAsync(() -> doB()); CompletableFuture.allOf(futureA,futureB) // 等a b 兩個任務都執行完成C c = doC(futureA.join(), futureB.join());CompletableFuture<D> futureD = CompletableFuture.supplyAsync(() -> doD(c)); CompletableFuture<E> futureE = CompletableFuture.supplyAsync(() -> doE(c)); CompletableFuture.allOf(futureD,futureE) // 等d e兩個任務都執行完成return doResult(futureD.join(),futureE.join());

    線程池的監控

    如果在系統中大量使用線程池,則有必要對線程池進行監控,方便在出現問題時,可以根 據線程池的使用狀況快速定位問題。可以通過線程池提供的參數進行監控,在監控線程池的 時候可以使用以下屬性。

  • taskCount:線程池需要執行的任務數量。
  • completedTaskCount:線程池在運行過程中已完成的任務數量,小于或等于taskCount。
  • largestPoolSize:線程池里曾經創建過的最大線程數量。通過這個數據可以知道線程池是否曾經滿過。如該數值等于線程池的最大大小,則表示線程池曾經滿過。
  • getPoolSize:線程池的線程數量。如果線程池不銷毀的話,線程池里的線程不會自動銷 毀,所以這個大小只增不減。
  • getActiveCount:獲取活動的線程數。
  • 通過擴展線程池進行監控。可以通過繼承線程池來自定義線程池,重寫線程池的 beforeExecute、afterExecute和terminated方法,也可以在任務執行前、執行后和線程池關閉前執行一些代碼來進行監控。例如,監控任務的平均執行時間、最大執行時間和最小執行時間等。 這幾個方法在線程池里是空方法。

    死鎖

    所謂死鎖是指兩個或兩個以上的進程在執行過程中因爭奪資源而相互等待的現象;如果沒有外力作用,他們則無法推進下去。

    產生死鎖的原因

  • 因為系統資源不足
  • 進程運行推進的順序不合適
  • 資源分配不當等
  • 產生死鎖的必要條件

  • 互斥條件
  • 請求和保持條件
  • 不剝奪條件
  • 環路等待條件
  • package com.tiger.deadLock;import java.util.concurrent.TimeUnit; /*** 死鎖 * @author tiger* @Date 2017年7月27日*/ public class DeadLock {public static void main(String[] args) {Park task = new Park();// 章魚線程Thread th1 = new Thread(task,"章魚");// 光頭線程Thread th2 = new Thread(task,"光頭");th1.start();th2.start();} } class Park implements Runnable{ //兩人共同擁有相同的兩把鎖String[] locks = {"0","1"};@Overridepublic void run() {String name = Thread.currentThread().getName();switch( name ){//光頭用 0 號卡進尖叫地帶"。case "光頭":尖叫地帶( locks[0] ); break;//章魚用 1 號卡進海底世界"。case "章魚":海底世界( locks[1] );break;}}/*** 光頭:持0號卡進外圍的尖叫地帶,玩一陣子后,想持另外一張卡(1號卡)進恐怖森林,但此時0號卡被占用。* @param card*/public void 尖叫地帶(String card){String name = Thread.currentThread().getName();//card 1 先進尖叫地帶synchronized (card) {System.out.println(name+" 進到尖叫地帶");//進去玩耍2秒try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {}/*在外圍玩耍完后,想進一步進到恐怖森林時,手頭只有1號卡可以使用,此時1號卡被其他人(線程)持有還沒有釋放,因此進不了,只能在外頭干等,此時另外一個人也是這種情況,所以造成死鎖。*/System.out.println(name+" 準備進到恐怖盛林");synchronized (locks[1]) {System.out.println(name+" 進到恐怖盛林");}}}/*** 章魚:持1號卡進外圍的海底世界,玩一陣子后,想持另外一張卡(0號卡)進東海龍宮,但此時0號卡被占用。* @param card*/public void 海底世界(String card){String name = Thread.currentThread().getName();// 持1號卡先進海底世界synchronized (card) {System.out.println(name+" 進到海底世界");// 進去玩耍2秒try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {}/*在外圍玩耍完后,想進一步進到東海龍宮時,手頭只有0號卡可以使用,此時0號卡被其他人(線程)持有還沒有釋放,因此進不了,只能在外頭干等,此時另外一個人也是這種情況,所以造成死鎖。*/System.out.println(name+" 準備進到東海龍宮");synchronized (locks[0]) {System.out.println(name+" 進到東海龍宮");}}} }

    總結

    以上是生活随笔為你收集整理的线程池学习笔记的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。