日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java 线程池 源码_java线程池源码分析

發布時間:2025/3/21 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java 线程池 源码_java线程池源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

我們在關閉線程池的時候會使用shutdown()和shutdownNow(),那么問題來了:

這兩個方法又什么區別呢?

他們背后的原理是什么呢?

線程池中線程超過了coresize后會怎么操作呢?

為了解決這些疑問我們需要分析java線程池的原理。

1 基本使用

1.1 繼承關系

平常我們在創建線程池經常使用的方式如下:

ExecutorService executorService = Executors.newFixedThreadPool(5);

看下newFixedThreadPool源碼, 其實Executors是個工廠類,內部是new了一個ThreadPoolExecuto:

public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue());

}

參數的意義就不介紹了,網上有很多內容,看源碼注釋也可以明白。

線程池中類的繼承關系如下:

2 源碼分析

2.1 入口

將一個Runnable放到線程池執行有兩種方式,一個是調用ThreadPoolExecutor#submit,一個是調用ThreadPoolExecutor#execute。其實submit是將Runnable封裝成了一個RunnableFuture,然后再調用execute,最終調用的還是execute,所以我們這里就只從ThreadPoolExecutor#execute開始分析。

2.2 ctl和線程池狀態

ThreadPoolExecutor中有個重要的屬性是ctl

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 高3位表示狀態,低29位表示線程池中線程的多少

private static final int COUNT_BITS = Integer.SIZE - 3; // 32-3 = 29

private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 左移29為減1,即最終得到為高3位為0,低29位為1的數字,作為掩碼,是二進制運算中常用的方法

private static final int RUNNING = -1 << COUNT_BITS; // 高三位111

private static final int SHUTDOWN = 0 << COUNT_BITS; // 高三位000

private static final int STOP = 1 << COUNT_BITS; // 高三位001

private static final int TIDYING = 2 << COUNT_BITS; // 高三位010

private static final int TERMINATED = 3 << COUNT_BITS; // 高三位011

// Packing and unpacking ctl

private static int runStateOf(int c) { return c & ~CAPACITY; } // 保留高3位,即計算線程池狀態

private static int workerCountOf(int c) { return c & CAPACITY; } // 保留低29位, 即計算線程數量

private static int ctlOf(int rs, int wc) { return rs | wc; } // 求ctl

ThreadPoolExecutor中使用32位Integer來表示線程池的狀態和線程的數量,其中高3位表示狀態,低29位表示數量。如果對二進制運行不熟悉可以參考:二進制運算。從上也可以看出線程池有五種狀態,我們關心前3中狀態

RUNNING 接收task和處理queue中的task

SHUTDOWN 不再接收新的task,但是會處理完正在運行的task和queue中的task,不會interrupt正在執行的task,其實調用shutdown后線程池處于該狀態

STOP 不再接收新的task,也不處理queue中的task,同時正在運行的線程會被interrupt。調用shutdownNow后線程池會處于該狀態。

2.3 execute

明白了ctl和線程池的狀態后我們來具體看下execute的處理邏輯

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) { // 線程數量小于coresize,那么就調用addWorker

if (addWorker(command, true)) // 這里知道,返回true就不往下走了

return;

c = ctl.get();

}

// 不滿足上述條件,即線程數量 >= coreSize,或者addWorker返回fasle,那么走下面的邏輯

if (isRunning(c) && workQueue.offer(command)) { // 可以看到是往blockingqueue中放task

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

// 如果不滿足上述條件,即blockingqueue也放不進去,那么就走下面的邏輯

else if (!addWorker(command, false))

reject(command);

}

從上面的代碼我們可以看到線程池處理線程的基本思路是: 如果線程數量小于coresize那么就執行task,否則就放到queue中,如果queue也放不下就走下面addWorker,如果也失敗了,那么就調用reject策略。當然還涉及一些細節,需要進一步分析。

2.4 addWorker

execute中反復調用的是addWorker

private boolean addWorker(Runnable firstTask, boolean core) {

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c); // 計算線程池狀態

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN && // 先忽略

! (rs == SHUTDOWN &&

firstTask == null &&

! workQueue.isEmpty()))

return false;

for (;;) {

int wc = workerCountOf(c); // 線程數量

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize)) // 可見如果超過了運行的最大線程數量則返回false

return false;

if (compareAndIncrementWorkerCount(c)) // 如果成功,線程數量肯定加1

break retry;

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

final ReentrantLock mainLock = this.mainLock;

w = new Worker(firstTask); // 將task封裝成了Worker

final Thread t = w.thread; // 來獲取worker的thread

if (t != null) {

mainLock.lock();

try {

// Recheck while holding lock.

// Back out on ThreadFactory failure or if

// shut down before lock acquired.

int c = ctl.get();

int rs = runStateOf(c);

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();

workers.add(w); // 將worker添加到hashset中報存,關閉的時候要使用

int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;

workerAdded = true;

}

} finally {

mainLock.unlock();

}

if (workerAdded) { // 經過一些檢查, 啟動了work的thread

t.start();

workerStarted = true;

}

}

} finally {

if (! workerStarted)

addWorkerFailed(w); // 如果線程啟動失敗,則將線程數減1

}

return workerStarted;

}

上面的代碼看起來比較復雜,但是如果我們忽略具體的細節,從大致思路上看,其實也比較簡單。上面代碼的主要思路就是:除了一些狀態檢查外,首先將線程數量加1,然后將runnable分裝成一個worker,去啟動worker線程,如果啟動失敗則再將線程數量減1。返回false的原因可能是線程數量大于允許的數量。所以addWorker調用成功,則會啟動一個work線程,且線程池中線程數量加1

2.5 worker

woker是線程池中真正的線程實體。線程池中的線程不是自定義的Runnable實現的線程,而是woker線程,worker在run方法里調用了自定義的Runnable的run方法。

Worker繼承了AQS,并實現了runnable接口:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this); // 這個時候回頭看看addWorker中t.start(), 就明白了啟動的實際是一個Woker線程,而不是用戶定義的Runnable

}

public void run() {

runWorker(this);

}

}

Worker中firstTask存儲了用戶定義的Runnable,thread是以他自身為參數的Thread對象。getThreadFactory()默認返回是Executors#DefaultThreadFactory,用來新建線程,并定義了線程名稱的前綴等:

static class DefaultThreadFactory implements ThreadFactory {

private static final AtomicInteger poolNumber = new AtomicInteger(1);

private final ThreadGroup group;

private final AtomicInteger threadNumber = new AtomicInteger(1);

private final String namePrefix;

DefaultThreadFactory() {

SecurityManager s = System.getSecurityManager();

group = (s != null) ? s.getThreadGroup() :

Thread.currentThread().getThreadGroup();

namePrefix = "pool-" +

poolNumber.getAndIncrement() +

"-thread-"; //

}

public Thread newThread(Runnable r) { // 調用后新建一個線程

Thread t = new Thread(group, r,

namePrefix + threadNumber.getAndIncrement(),

0);

if (t.isDaemon())

t.setDaemon(false);

if (t.getPriority() != Thread.NORM_PRIORITY)

t.setPriority(Thread.NORM_PRIORITY);

return t;

}

}

2.6 runWoker

Worker的run方法調用了runWorker,并將自身作為參數傳了進去,下面看看問題的關鍵:runWorker:

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // allow interrupts

boolean completedAbruptly = true;

try {

while (task != null || (task = getTask()) != null) { // 注意這里的while循環,這里很關鍵。這里注意,如果兩個條件都滿足了,那么線程就結束了

w.lock(); // 注意worker繼承了AQS,相當于自己實現了鎖,這個在關閉線程的時候有用

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

try {

beforeExecute(wt, task);

Throwable thrown = null;

try {

task.run(); // 僅僅是回調了Runnable的run方法

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

afterExecute(task, thrown);

}

} finally {

task = null; // 重點,task執行完后就被置位null

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly); // 注意while循環結束后worker線程就結束了

}

}

runWorker中有個while循環,while中判斷條件為(task != null || (task = getTask()) != null)。假設我們按照正常的邏輯,即task != null,則會調用task.run方法,執行完run方法后然后在finally中task被置為null;接著又進入while循環判斷,這次task == null,所以不符合第一個判斷條件,則會繼續判斷 task == getTask()) != null。我們來看下getTask做了什么。

2.7 getTask

private Runnable getTask() {

boolean timedOut = false; // Did the last poll() time out?

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 當調用shutdown()方法的時候,線程狀態就為shutdown了; 當調用shutdownow()的時候,線程狀態就為stop了

decrementWorkerCount();

return null;

}

boolean timed; // Are workers subject to culling?

for (;;) { // 通過死循環設置狀態

int wc = workerCountOf(c);

// 設置允許core線程timeout或者線程數量大于coresize,則允許線程超時

timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 如果線程數量 <= 最大線程數 且 沒有超時和允許超時 則跳出死循環

if (wc <= maximumPoolSize && ! (timedOut && timed))

break;

if (compareAndDecrementWorkerCount(c))

return null;

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

try {

// 這里是關鍵,如果允許超時則調用poll從queue中取出task,否則就調用take可阻塞的獲取task

Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

workQueue.take();

if (r != null) // 獲取到task則返回,然后runWorker的while循環就繼續執行,并調用task的run方法

return r;

timedOut = true; // 否則設置為timeOut,繼續循環,但是下次循環會走到if (compareAndDecrementWorkerCount(c)) 處,并返回null。

} catch (InterruptedException retry) {

timedOut = false;

}

}

}

忽略掉具體細節,getTask的整體思路是: 從blockqueue中拿去task,如果queue中沒有task則分兩種情況:

如果允許超時則調用poll(keepAliveTime, TimeUnit.NANOSECONDS),在規定時間沒有返回了則getTask返回null,runWorker結束while循環,work線程結束。當線程數量大于coresize且blockqueue滿的時候且小于maxsize的時候,新創建的線程便是走這個邏輯;或者允許core線程超時的時候也是走這個邏輯

如果不允許超時,則會一直阻塞直到blockqueue中有了新的task。take方法阻塞則表示worker線程也阻塞,也就是在沒有task執行的情況下,worker線程便會阻塞等待。core線程走的就是這個邏輯。

這個時候回頭再看下runWorker,如果task != null,那么就會執行task的run方法,執行完后task就會為被置為null,再次進入while循環執行getTask阻塞在這里了。通過這種方式保留住了線程。如果while循環結束了,那么worker線程也就結束了。

2.8 再看addWorker

分析到這里我們再來看下addWoker。addWorker可以將第一個參數設置為null。例如ThreadPoolExecutor#prestartAllCoreThreads:

public int prestartAllCoreThreads() {

int n = 0;

while (addWorker(null, true)) // addWorker第一個參數是null

++n;

return n;

}

經過前面的分析,我們知道addWoker用來啟動一個worker線程,worker線程調用runWorker來執行,而runWorker中有個while循環,判斷條件是(task != null || (task = getTask()) != null)。因為我們傳入的task為null,所以就會判斷task = getTask()) != null,而getTask就是去blockqueue中拿去數據,如果沒有任務就會阻塞住。這個時候就是一個阻塞的線程在等待task的到來了。所以傳入參數為null表示創建一個空的線程,什么都不執行。

2.9 再看execute

已經知道了線程池內部的大概工作情況,我們再來看下如果所有core線程都創建好了且處于空置狀態,這個時候新放入一個線程的執行流程。

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) { // core線程都創建好了,所以判斷條件不滿足

if (addWorker(command, true))

return;

c = ctl.get();

}

// 會走到這里,會通過offer往blockingqueue里放置一個task。這個時候阻塞的core線程會通過blockingqueue的take拿到task執行,類似一個生產者消費者的情況

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

// 如果blockingqueue添加失敗,則創建線程直到maxsize

else if (!addWorker(command, false))

reject(command);

}

可見,線程和execute通過blockingqueue來通信,而不是其他方式,execute往blockingqueue中放置task,線程通過take來獲取。整體線程池的邏輯如下圖

2.10 shutdown

這個時候我們終于可以來看看shutdown和shutdownNow了

看下shutdown

public void shutdown() {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

checkShutdownAccess();

advanceRunState(SHUTDOWN); // 重點,將線程狀態置為shutdown,這樣getTask等workqueue為空后就返回null了

interruptIdleWorkers(); // 重點

onShutdown(); // 什么都沒做

} finally {

mainLock.unlock();

}

tryTerminate();

}

private void interruptIdleWorkers() {

interruptIdleWorkers(false);

}

private void interruptIdleWorkers(boolean onlyOne) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

for (Worker w : workers) {

Thread t = w.thread;

// 線程沒有中斷 且 獲取到worker的鎖

if (!t.isInterrupted() && w.tryLock()) {

try {

t.interrupt(); // 調用interrup,中斷線程

} catch (SecurityException ignore) {

} finally {

w.unlock();

}

}

if (onlyOne)

break;

}

} finally {

mainLock.unlock();

}

}

shutdown的核心方法在interruptIdleWorkers里,這里可以看到在t.interrupt的時候有個判斷添加,一個是線程沒有設置中斷標記,第二個是獲取到worker的鎖,我們注意下第二個條件。回頭看下runWorker,while中執行task的run方法的時候,會先獲取到worker線程的鎖,所以如果線程正在執行task的run方法,則shutdown的時候會獲取鎖失敗,也就不會中斷線程了。這里可以得出結論:shutdown不會中斷正在執行的線程。

如果blockingqueu中有task還沒執行完呢? 這個時候while中的take并不會阻塞,也不會被中斷,shutdown中也沒有清空blockingqueue的操作。所以可以得出結論:shutdown會等blockingqueue中的task執行完成再關閉。可以說shutdown是一種比較溫柔的關閉方式了。

如果core線程都阻塞在take方法上了,即沒有正在執行的task了,那么這個時候 t.interrupt則會中斷take方法,worker線程的while循環結束,worker線程結束。當所有的worker線程都結束后線程池就關閉了

總結下就是: shutdown會把它被調用前放到線程池中的task全部執行完。

2.11 shutdownNow

再來看下shutdownNow

public List shutdownNow() {

List tasks;

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

checkShutdownAccess();

advanceRunState(STOP); // 重點,將線程狀態置為stop

interruptWorkers(); // 重點

tasks = drainQueue(); // 重點

} finally {

mainLock.unlock();

}

tryTerminate();

return tasks;

}

private void interruptWorkers() {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

for (Worker w : workers)

w.interruptIfStarted();

} finally {

mainLock.unlock();

}

}

void interruptIfStarted() {

Thread t;

if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // 沒有去獲取woker的鎖

try {

t.interrupt();

} catch (SecurityException ignore) {

}

}

}

private List drainQueue() {

BlockingQueue q = workQueue;

List taskList = new ArrayList();

q.drainTo(taskList); // 將blockingqueue中的task清空

if (!q.isEmpty()) {

for (Runnable r : q.toArray(new Runnable[0])) {

if (q.remove(r))

taskList.add(r);

}

}

return taskList;

}

從上面的代碼可以看出:

shutdownNow不會去獲取worker的鎖,所以shutdownNow會導致正在運行的task也被中斷

shutdownNow會將blockingqueue中的task清空,所以在blockingqueue中的task也不會被執行

總結就是shutdownNow比較粗暴,調用他后,他會將所有之前提交的任務都interrupt,且將blockingqueue中的task清空

另外就是不論是shutdown還是shutdownNow都是調用Thread的interrupt()方法。如果task不響應中斷或者忽略中斷標記,那么這個線程就不會被終止。例如在run中執行以下邏輯

poolExecutor.execute(new Runnable() {

@Override

public void run() {

while (true) {

System.out.println("b");

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

System.out.printf("不處理"); // 忽略中斷

}

}

}

});

運行結果是,即使調用了shutdownNow也終止不了線程運行

b

0

不處理b

b

b

b

b

....

3 總結

線程通過while循環不停的從blockingqueue中獲取task來保留線程,避免重復重建線程

4 參考

總結

以上是生活随笔為你收集整理的java 线程池 源码_java线程池源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。