多线程与高并发(七):详解线程池 - 自定义线程池,JDK自带线程池,ForkJoin,源码解析等
Executor 接口關(guān)系
Callable:類似于Runnable,但是可以有返回值
Future:存儲(chǔ)將來(lái)執(zhí)行的結(jié)果。Callable被執(zhí)行完之后的結(jié)果,被封裝到Future里面。
Future 示例:
/*** 認(rèn)識(shí)Callable,對(duì)Runnable進(jìn)行了擴(kuò)展* 對(duì)Callable的調(diào)用,可以有返回值*/ package com.mashibing.juc.c_026_01_ThreadPool;import java.util.concurrent.*;public class T03_Callable {public static void main(String[] args) throws ExecutionException, InterruptedException {Callable<String> c = new Callable() {@Overridepublic String call() throws Exception {return "Hello Callable";}};ExecutorService service = Executors.newCachedThreadPool();Future<String> future = service.submit(c); //異步System.out.println(future.get());//阻塞service.shutdown();} }FutureTask:更加靈活,是Runnable和Future的結(jié)合,既是一個(gè)Runnable,又可以存結(jié)果
FutureTask示例:
/*** 認(rèn)識(shí)FutureTask*/ package com.mashibing.juc.c_026_01_ThreadPool;import java.util.concurrent.*;public class T06_00_Future {public static void main(String[] args) throws InterruptedException, ExecutionException {FutureTask<Integer> task = new FutureTask<>(() -> {TimeUnit.MILLISECONDS.sleep(500);return 1000;}); //new Callable () { Integer call();}new Thread(task).start();System.out.println(task.get()); //阻塞} }CompletableFuture
可以用來(lái)管理多個(gè)Future的結(jié)果,對(duì)各種各樣的結(jié)果進(jìn)行組合處理。你可以去查查它的API~
提供了很多非常好用的接口,十分友好!
示例:假設(shè)你能夠提供一個(gè)服務(wù),這個(gè)服務(wù)查詢各大電商網(wǎng)站同一類產(chǎn)品的價(jià)格并匯總展示,你用CompletableFuture開啟三個(gè)線程來(lái)完成這個(gè)任務(wù)
package com.mashibing.juc.c_026_01_ThreadPool;import java.io.IOException; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;public class T06_01_CompletableFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {long start, end;/*start = System.currentTimeMillis();priceOfTM();priceOfTB();priceOfJD();end = System.currentTimeMillis();System.out.println("use serial method call! " + (end - start));*/start = System.currentTimeMillis();CompletableFuture<Double> futureTM = CompletableFuture.supplyAsync(() -> priceOfTM());CompletableFuture<Double> futureTB = CompletableFuture.supplyAsync(() -> priceOfTB());CompletableFuture<Double> futureJD = CompletableFuture.supplyAsync(() -> priceOfJD());CompletableFuture.allOf(futureTM, futureTB, futureJD).join(); // 提供對(duì)于一堆任務(wù)的管理:這三個(gè)任務(wù)全部完成之后,才能繼續(xù)向下運(yùn)行 // CompletableFuture.anyOf(futureTM, futureTB, futureJD).join(); // 任意一個(gè)任務(wù)完成,就能繼續(xù)向下運(yùn)行CompletableFuture.supplyAsync(() -> priceOfTM()).thenApply(String::valueOf).thenApply(str -> "price " + str).thenAccept(System.out::println);end = System.currentTimeMillis();System.out.println("use completable future! " + (end - start));try {System.in.read();//因?yàn)槔锩嫒钱惒?#xff0c;所以需要阻塞一下,才能正常的等待它們輸出,不然主線程先結(jié)束了。。} catch (IOException e) {e.printStackTrace();}}private static double priceOfTM() {delay();return 1.00;}private static double priceOfTB() {delay();return 2.00;}private static double priceOfJD() {delay();return 3.00;}/*private static double priceOfAmazon() {delay();throw new RuntimeException("product not exist!");}*/private static void delay() {int time = new Random().nextInt(500);try {TimeUnit.MILLISECONDS.sleep(time);} catch (InterruptedException e) {e.printStackTrace();}System.out.printf("After random %s sleep!\n", time);} }線程池
ThreadPoolExecutor:我們通常所說(shuō)的線程池
ForkJoinPoll:先將任務(wù)分解,最后再匯總
如何自定義一個(gè)線程池
阿里《Java開發(fā)手冊(cè)》建議自定義線程池,所以我們先來(lái)看看如何自定義一個(gè)線程池。
線程池
維護(hù)兩個(gè)集合:
- 線程集合
- 任務(wù)集合
用的是HashSet
定義一個(gè)自定義線程池,最多有7個(gè)參數(shù),這個(gè)面試經(jīng)常被使勁問(wèn)
總結(jié)
以上是生活随笔為你收集整理的多线程与高并发(七):详解线程池 - 自定义线程池,JDK自带线程池,ForkJoin,源码解析等的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: JVM从入门到精通(六):JVM调优必备
- 下一篇: JVM从入门到精通(七):GC常用参数,