生活随笔
收集整理的這篇文章主要介紹了
线程池是如何复用的?
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
問題:
線程池是如何實現線程復用,如何并行執行多個任務的。
簡單:
一般都是介紹,核心線程和最大線程數量,介紹創建線程的規則。缺少了,如何實現復用的。
本文以這個為出發點,簡單分析線程池的復用。其實就是簡單的幾行源碼分析,和線程池組件分析。
線程池的組件
-
N個線程(core,Max)
可以執行任務的若干個容器
-
阻塞隊列 BlockingQueue
存放待執行任務
線程創建規則core/max
略
線程復用
即,如何將放入線程中的諸多任務,在N個線程中執行的。
-
ThreadPoolExecutor.execute()
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}
分析:可以看出:ThreadPoolExecutor.execute()的功能就是:
1、將任務添加至阻塞隊列workQueue,workQueue.offer(command)
2、根據core和maxPool,選擇是否創建Worker,addWorker()
因此,線程復用的實現應該在worker中,打開addWorker()方法觀察
private boolean addWorker(Runnable firstTask, boolean core) {//創建workerretry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}//啟動workerboolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {//ThreadExecutor的全局鎖,在創建\銷毀worker工作池的時候,才會用到final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
分析:addworker分為兩部分:1、創建worker,2、啟動worker
規則校驗:
與core和maxPool數量的規則相同
創建worker:
獲取ThreadLocal的全局鎖。 安全的創建Worker。
t.start();
因此:重點又回到了Worker的run方法上
public void run() {runWorker(this);
}
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
分析:這里就比較清晰了:
1、通過getTask()方法,獲取待執行的任務。
2、通過task.run();執行具體的任務。
3、正常情況,只有當所有任務執行完畢才會停止運行。
因此:
1、進一步分析getTask()
2、執行task.run()方法。-->>這里可以看出,事實上線程在執行任務的時候,本質上是調用了任務自身的run/call方法。
==》》有點像是thread.get(threadlocal) 本質上是調用了 threadlocalMap.get(thread) 的感覺
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
分析:也不用把代碼完全細節完全深究,可以發現方法是從workQueue中獲取task的,所以最終的問題就是看這個變量workQueue是誰的成員變量。
public class ThreadPoolExecutor extends AbstractExecutorService {private final BlockingQueue<Runnable> workQueue;。。。
}
分析,getTask是從線程池中,獲取的任務。即所有的任務都放在ThreadPoolExecutor中,線程池啟動多個Worker去執行任務,每個worker不停的從ThreadPoolExector的workQueue中取出任務,比你高執行task.run()方法,直至所有的任務執行完畢。
至此分析完畢。
總結
阻塞隊列 BlockingQueue
與線程池綁定,負責存放所有的待執行任務。
N個線程(core,Max)
本質上是指N個Worker對象
執行:
1、ThreadPoolExeuctor.execute();
根據規則,創建工人Worker,將提交的任務,添加任務至阻塞隊列
2、Worker
真正執行任務的方法,不停的循環,直至所有的任務執行完畢,或者exit
3、阻塞隊列 BlockingQueue
存放所有的待執行任務
說明
1、本文的目的是,解釋如何實現線程復用,提交給多線程的任務是如何被執行的?;谶@個目的出發,本文已經完成了目標。
2、因此,本文不是完整的介紹線程池,因此對許多知識點沒有深究。
?
總結
以上是生活随笔為你收集整理的线程池是如何复用的?的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。