日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

java线程池的使用学习

發(fā)布時(shí)間:2025/3/21 编程问答 17 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java线程池的使用学习 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

目錄

  • 1. 線程池的創(chuàng)建
  • 2. 線程池的運(yùn)行規(guī)則
  • 3. 線程池的關(guān)閉
  • 4. 線程池的使用場(chǎng)合
  • 5. 線程池大小的設(shè)置
  • 6 實(shí)現(xiàn)舉例

1. 線程池的創(chuàng)建

線程池的創(chuàng)建使用ThreadPoolExecutor類(lèi),有利于編碼時(shí)更好的明確線程池運(yùn)行規(guī)則。

//構(gòu)造函數(shù)/*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even* if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the* pool* @param keepAliveTime when the number of threads is greater than* the core, this is the maximum time that excess idle threads* will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are* executed. This queue will hold only the {@code Runnable}* tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor* creates a new thread* @param handler the handler to use when execution is blocked* because the thread bounds and queue capacities are reached* @throws IllegalArgumentException if one of the following holds:<br>* {@code corePoolSize < 0}<br>* {@code keepAliveTime < 0}<br>* {@code maximumPoolSize <= 0}<br>* {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}* or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

參數(shù)含義

(1) 核心線程數(shù)corePoolSize: 保持在池中的線程數(shù)

(2) 最大線程數(shù)maximumPoolSize

(3) 保活時(shí)間keepAliveTime: 線程數(shù)大于corePoolSize,閑置線程最大空閑時(shí)間

(4) 時(shí)間單位unit

(5) 阻塞隊(duì)列workQueue
java.util.concurrent.BlockingQueue主要實(shí)現(xiàn)類(lèi)有:

  • ArrayBlockingQueue: 數(shù)組結(jié)構(gòu)有界阻塞隊(duì)列,FIFO排序。其構(gòu)造函數(shù)必須設(shè)置隊(duì)列長(zhǎng)度。
  • LinkedBlockingQueue:鏈表結(jié)構(gòu)有界阻塞隊(duì)列,FIFO排序。隊(duì)列默認(rèn)最大長(zhǎng)度為Integer.MAX_VALUE,故可能會(huì)堆積大量請(qǐng)求,導(dǎo)致OOM。
  • PriorityBlockingQueue:支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列。默認(rèn)自然順序排列,可以通過(guò)比較器comparator指定排序規(guī)則。
  • DelayQueue:支持延時(shí)獲取元素的無(wú)界阻塞隊(duì)列。隊(duì)列使用PriorityQueue實(shí)現(xiàn)。

(6) 線程創(chuàng)建接口threadFactory

  • 默認(rèn)使用Executors.defaultThreadFactory()。
  • 可以自定義ThreadFactory實(shí)現(xiàn)或使用第三方實(shí)現(xiàn),方便指定有意義的線程名稱(chēng)
import com.google.common.util.concurrent.ThreadFactoryBuilder; ...ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("my-pool-%d").build(); public class MyThreadFactory implements ThreadFactory {private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;MyThreadFactory(String namePrefix) {this.namePrefix = namePrefix+"-";}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread( r,namePrefix + threadNumber.getAndIncrement());if (t.isDaemon()) {t.setDaemon(true);}if (t.getPriority() != Thread.NORM_PRIORITY) {t.setPriority(Thread.NORM_PRIORITY);}return t;} }

(7) 飽和策略handler

  • ThreadPoolExecutor.AbortPolicy():終止策略(默認(rèn)) , 拋出java.util.concurrent.RejectedExecutionException異常。
  • ThreadPoolExecutor.CallerRunsPolicy(): 重試添加當(dāng)前的任務(wù),他會(huì)自動(dòng)重復(fù)調(diào)用execute()方法。
  • ThreadPoolExecutor.DiscardOldestPolicy(): 拋棄下一個(gè)即將被執(zhí)行的任務(wù),然后嘗試重新提交新的任務(wù)。最好不和優(yōu)先級(jí)隊(duì)列一起使用,因?yàn)樗鼤?huì)拋棄優(yōu)先級(jí)最高的任務(wù)。
  • ThreadPoolExecutor.DiscardPolicy(): 拋棄策略, 拋棄當(dāng)前任務(wù)

2. 線程池的運(yùn)行規(guī)則

execute添加任務(wù)到線程池:
一個(gè)任務(wù)通過(guò)execute(Runnable)方法被添加到線程池。任務(wù)是一個(gè) Runnable類(lèi)型的對(duì)象,任務(wù)的執(zhí)行方法就是 Runnable類(lèi)型對(duì)象的run()方法。

線程池運(yùn)行規(guī)則:
當(dāng)一個(gè)任務(wù)通過(guò)execute(Runnable)方法添加到線程池時(shí):

  • 如果此時(shí)線程池中的數(shù)量小于corePoolSize,即使線程池中的線程都處于空閑狀態(tài),也要?jiǎng)?chuàng)建新的線程來(lái)處理被添加的任務(wù)。
  • 如果此時(shí)線程池中的數(shù)量等于 corePoolSize,但是緩沖隊(duì)列 workQueue未滿,那么任務(wù)被放入緩沖隊(duì)列。
  • 如果此時(shí)線程池中的數(shù)量大于corePoolSize,緩沖隊(duì)列workQueue滿,并且線程池中的數(shù)量小于maximumPoolSize,建新的線程來(lái)處理被添加的任務(wù)。
  • 如果此時(shí)線程池中的數(shù)量大于corePoolSize,緩沖隊(duì)列workQueue滿,并且線程池中的數(shù)量等于maximumPoolSize,那么通過(guò) handler所指定的策略來(lái)處理此任務(wù)。

  • 也就是:處理任務(wù)的優(yōu)先級(jí)為:

    核心線程corePoolSize - > 任務(wù)隊(duì)列workQueue - > 最大線程maximumPoolSize
    如果三者都滿了,使用handler策略處理該任務(wù)。

  • 當(dāng)線程池中的線程數(shù)量大于 corePoolSize時(shí),如果某線程空閑時(shí)間超過(guò)keepAliveTime,線程將被終止。這樣,線程池可以動(dòng)態(tài)的調(diào)整池中的線程數(shù)。

// execute方法源碼實(shí)現(xiàn)(jdk1.8)public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}

3. 線程池的關(guān)閉

通過(guò)調(diào)用線程池的shutdown或shutdownNow方法來(lái)關(guān)閉線程池。

  • shutdown:將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后interrupt空閑線程。
  • shutdownNow:線程池的狀態(tài)設(shè)置成STOP,然后嘗試interrupt所有線程,包括正在運(yùn)行的。

關(guān)于線程池狀態(tài),源碼中的注釋比較清晰:

再看一下源代碼:

// 在關(guān)閉中,之前提交的任務(wù)會(huì)被執(zhí)行(包含正在執(zhí)行的,在阻塞隊(duì)列中的),但新任務(wù)會(huì)被拒絕。public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 狀態(tài)設(shè)置為shutdownadvanceRunState(SHUTDOWN);// interrupt空閑線程interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}// 嘗試終止線程池tryTerminate();}

其中,interruptIdleWorkers()方法往下調(diào)用了interruptIdleWorkers(), 這里w.tryLock()比較關(guān)鍵。
中斷之前需要先tryLock()獲取worker鎖,正在運(yùn)行的worker tryLock()失敗(runWorker()方法會(huì)先對(duì)worker上鎖),故正在運(yùn)行的worker不能中斷。

// 嘗試停止所有正在執(zhí)行的任務(wù),停止對(duì)等待任務(wù)的處理,并返回正在等待被執(zhí)行的任務(wù)列表public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 狀態(tài)設(shè)置為STOPadvanceRunState(STOP);// 停止所有線程 interruptWorkers邏輯簡(jiǎn)單些,循環(huán)對(duì)所有worker調(diào)用interruptIfStarted().(interrupt所有線程)interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;}

4. 線程池的使用場(chǎng)合

(1)單個(gè)任務(wù)處理的時(shí)間比較短;
(2)需要處理的任務(wù)數(shù)量大;

5. 線程池大小的設(shè)置

可根據(jù)計(jì)算任務(wù)類(lèi)型估算線程池設(shè)置大小:

cpu密集型:可采用Runtime.avaliableProcesses()+1個(gè)線程;
IO密集型:由于阻塞操作多,可使用更多的線程,如2倍cpu核數(shù)。

6 實(shí)現(xiàn)舉例

場(chǎng)景: ftp服務(wù)器收到文件后,觸發(fā)相關(guān)搬移/處理操作。

public class FtpEventHandler extends DefaultFtplet {@Overridepublic FtpletResult onUploadEnd(FtpSession session, FtpRequest request)throws FtpException, IOException {// 獲取文件名String fileName = request.getArgument();Integer index = fileName.lastIndexOf("/");String realFileName = fileName.substring(index + 1);index = realFileName.lastIndexOf("\\");realFileName = realFileName.substring(index + 1);// **處理文件**ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 50, 10,TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3));threadPool.execute(new fileSenderThread(realFileName));return FtpletResult.DEFAULT;} }

Spring也提供了ThreadPoolTaskExecutor

<!--spring.xml配置示例--><bean id="gkTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><property name="allowCoreThreadTimeOut" value="true"/><property name="corePoolSize" value="10"/><property name="maxPoolSize" value="50"/><property name="queueCapacity" value="3"/><property name="keepAliveSeconds" value="10"/><property name="rejectedExecutionHandler"value="#{new java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy()}"/><property name="threadNamePrefix" value="gkTaskExecutor"/></bean>//java代碼中注入bean@Autowired @Qualifier("gkTaskExecutor") private ThreadPoolTaskExecutor gkTaskExecutor;

end.

轉(zhuǎn)載于:https://www.cnblogs.com/eaglediao/p/7742570.html

《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專(zhuān)家共同創(chuàng)作,文字、視頻、音頻交互閱讀

總結(jié)

以上是生活随笔為你收集整理的java线程池的使用学习的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。