【Java线程】线程池的原理和实现
1、為什么要使用線程池?
線程池是Java5提供的一個(gè)新技術(shù),方便我們快速簡(jiǎn)潔的定義線程池。
諸如 Web 服務(wù)器、數(shù)據(jù)庫(kù)服務(wù)器、文件服務(wù)器或郵件服務(wù)器之類(lèi)的許多服務(wù)器應(yīng)用程序都面向處理來(lái)自某些遠(yuǎn)程來(lái)源的大量短小的任務(wù)。請(qǐng)求以某種方式到達(dá)服務(wù)器,這種方式可能是通過(guò)網(wǎng)絡(luò)協(xié)議(例如 HTTP、FTP 或 POP)、通過(guò) JMS 隊(duì)列或者可能通過(guò)輪詢數(shù)據(jù)庫(kù)。不管請(qǐng)求如何到達(dá),服務(wù)器應(yīng)用程序中經(jīng)常出現(xiàn)的情況是:單個(gè)任務(wù)處理的時(shí)間很短而請(qǐng)求的數(shù)目卻是巨大的。
構(gòu)建服務(wù)器應(yīng)用程序的一個(gè)過(guò)于簡(jiǎn)單的模型應(yīng)該是:每當(dāng)一個(gè)請(qǐng)求到達(dá)就創(chuàng)建一個(gè)新線程,然后在新線程中為請(qǐng)求服務(wù)。實(shí)際上,對(duì)于原型開(kāi)發(fā)這種方法工作得很好,但如果試圖部署以這種方式運(yùn)行的服務(wù)器應(yīng)用程序,那么這種方法的嚴(yán)重不足就很明顯。每個(gè)請(qǐng)求對(duì)應(yīng)一個(gè)線程(thread-per-request)方法的不足之一是:為每個(gè)請(qǐng)求創(chuàng)建一個(gè)新線程的開(kāi)銷(xiāo)很大;為每個(gè)請(qǐng)求創(chuàng)建新線程的服務(wù)器在創(chuàng)建和銷(xiāo)毀線程上花費(fèi)的時(shí)間和消耗的系統(tǒng)資源要比花在處理實(shí)際的用戶請(qǐng)求的時(shí)間和資源更多。
除了創(chuàng)建和銷(xiāo)毀線程的開(kāi)銷(xiāo)之外,活動(dòng)的線程也消耗系統(tǒng)資源。在一個(gè) JVM 里創(chuàng)建太多的線程可能會(huì)導(dǎo)致系統(tǒng)由于過(guò)度消耗內(nèi)存而用完內(nèi)存或“切換過(guò)度”。為了防止資源不足,服務(wù)器應(yīng)用程序需要一些辦法來(lái)限制任何給定時(shí)刻處理的請(qǐng)求數(shù)目。
線程池為線程生命周期開(kāi)銷(xiāo)問(wèn)題和資源不足問(wèn)題提供了解決方案。通過(guò)對(duì)多個(gè)任務(wù)重用線程,線程創(chuàng)建的開(kāi)銷(xiāo)被分?jǐn)偟搅硕鄠€(gè)任務(wù)上。其好處是,因?yàn)樵谡?qǐng)求到達(dá)時(shí)線程已經(jīng)存在,所以無(wú)意中也消除了線程創(chuàng)建所帶來(lái)的延遲。這樣,就可以立即為請(qǐng)求服務(wù),使應(yīng)用程序響應(yīng)更快。而且,通過(guò)適當(dāng)?shù)卣{(diào)整線程池中的線程數(shù)目,也就是當(dāng)請(qǐng)求的數(shù)目超過(guò)某個(gè)閾值時(shí),就強(qiáng)制其它任何新到的請(qǐng)求一直等待,直到獲得一個(gè)線程來(lái)處理為止,從而可以防止資源不足。
2、線程池的實(shí)現(xiàn)原理
多線程技術(shù)主要解決處理器單元內(nèi)多個(gè)線程執(zhí)行的問(wèn)題,它可以顯著減少處理器單元的閑置時(shí)間,增加處理器單元的吞吐能力。
假設(shè)一個(gè)服務(wù)器完成一項(xiàng)任務(wù)所需時(shí)間為:T1 創(chuàng)建線程時(shí)間,T2 在線程中執(zhí)行任務(wù)的時(shí)間,T3 銷(xiāo)毀線程時(shí)間。
如果:T1 + T3 遠(yuǎn)大于 T2,則可以采用線程池,以提高服務(wù)器性能。
一個(gè)線程池包括以下四個(gè)基本組成部分:
1、線程池管理器(ThreadPool):用于創(chuàng)建并管理線程池,包括 創(chuàng)建線程池,銷(xiāo)毀線程池,添加新任務(wù);
2、工作線程(PoolWorker):線程池中線程,在沒(méi)有任務(wù)時(shí)處于等待狀態(tài),可以循環(huán)的執(zhí)行任務(wù);
3、任務(wù)接口(Task):每個(gè)任務(wù)必須實(shí)現(xiàn)的接口,以供工作線程調(diào)度任務(wù)的執(zhí)行,它主要規(guī)定了任務(wù)的入口,任務(wù)執(zhí)行完后的收尾工作,任務(wù)的執(zhí)行狀態(tài)等;
4、任務(wù)隊(duì)列(taskQueue):用于存放沒(méi)有處理的任務(wù)。提供一種緩沖機(jī)制。
線程池技術(shù)正是關(guān)注如何縮短或調(diào)整T1,T3時(shí)間的技術(shù),從而提高服務(wù)器程序性能的。它把T1,T3分別安排在服務(wù)器程序的啟動(dòng)和結(jié)束的時(shí)間段或者一些空閑的時(shí)間段,這樣在服務(wù)器程序處理客戶請(qǐng)求時(shí),不會(huì)有T1,T3的開(kāi)銷(xiāo)了。
線程池不僅調(diào)整T1,T3產(chǎn)生的時(shí)間段,而且它還顯著減少了創(chuàng)建線程的數(shù)目,看一個(gè)例子:
假設(shè)一個(gè)服務(wù)器一天要處理50000個(gè)請(qǐng)求,并且每個(gè)請(qǐng)求需要一個(gè)單獨(dú)的線程完成。在線程池中,線程數(shù)一般是固定的,所以產(chǎn)生線程總數(shù)不會(huì)超過(guò)線程池中線程的數(shù)目,而如果服務(wù)器不利用線程池來(lái)處理這些請(qǐng)求則線程總數(shù)為50000。一般線程池大小是遠(yuǎn)小于50000。所以利用線程池的服務(wù)器程序不會(huì)為了創(chuàng)建50000而在處理請(qǐng)求時(shí)浪費(fèi)時(shí)間,從而提高效率。
代碼實(shí)現(xiàn)中并沒(méi)有實(shí)現(xiàn)任務(wù)接口,而是把Runnable對(duì)象加入到線程池管理器(ThreadPool),然后剩下的事情就由線程池管理器(ThreadPool)來(lái)完成了
/** * 線程管理器:創(chuàng)建線程,執(zhí)行任務(wù),銷(xiāo)毀線程,獲取線程基本信息 */ public final class ThreadPool { // 線程池中默認(rèn)線程的個(gè)數(shù)為5 private static int worker_num = 5; // 工作線程 private WorkThread[] workThrads; // 已完成的任務(wù)個(gè)數(shù) private static volatile int finished_task = 0; // 任務(wù)隊(duì)列,作為一個(gè)緩沖,List線程不安全 private List<Runnable> taskQueue = new LinkedList<Runnable>(); private static ThreadPool threadPool; // 創(chuàng)建具有默認(rèn)線程個(gè)數(shù)的線程池 private ThreadPool() { this(5); } // 創(chuàng)建線程池,worker_num為線程池中工作線程的個(gè)數(shù) private ThreadPool(int worker_num) { ThreadPool.worker_num = worker_num; workThrads = new WorkThread[worker_num]; for (int i = 0; i < worker_num; i++) { workThrads[i] = new WorkThread(); workThrads[i].start();// 開(kāi)啟線程池中的線程 } } // 單態(tài)模式,獲得一個(gè)默認(rèn)線程個(gè)數(shù)的線程池 public static ThreadPool getThreadPool() { return getThreadPool(ThreadPool.worker_num); } // 單態(tài)模式,獲得一個(gè)指定線程個(gè)數(shù)的線程池,worker_num(>0)為線程池中工作線程的個(gè)數(shù) // worker_num<=0創(chuàng)建默認(rèn)的工作線程個(gè)數(shù) public static ThreadPool getThreadPool(int worker_num1) { if (worker_num1 <= 0) worker_num1 = ThreadPool.worker_num; if (threadPool == null) threadPool = new ThreadPool(worker_num1); return threadPool; } // 執(zhí)行任務(wù),其實(shí)只是把任務(wù)加入任務(wù)隊(duì)列,什么時(shí)候執(zhí)行有線程池管理器覺(jué)定 public void execute(Runnable task) { synchronized (taskQueue) { taskQueue.add(task); taskQueue.notify(); } } // 批量執(zhí)行任務(wù),其實(shí)只是把任務(wù)加入任務(wù)隊(duì)列,什么時(shí)候執(zhí)行有線程池管理器覺(jué)定 public void execute(Runnable[] task) { synchronized (taskQueue) { for (Runnable t : task) taskQueue.add(t); taskQueue.notify(); } } // 批量執(zhí)行任務(wù),其實(shí)只是把任務(wù)加入任務(wù)隊(duì)列,什么時(shí)候執(zhí)行有線程池管理器覺(jué)決定 public void execute(List<Runnable> task) { synchronized (taskQueue) { for (Runnable t : task) taskQueue.add(t); taskQueue.notify(); } } // 銷(xiāo)毀線程池,該方法保證在所有任務(wù)都完成的情況下才銷(xiāo)毀所有線程,否則等待任務(wù)完成才銷(xiāo)毀 public void destroy() { while (!taskQueue.isEmpty()) {// 如果還有任務(wù)沒(méi)執(zhí)行完成,就先睡會(huì)吧 try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } // 工作線程停止工作,且置為null for (int i = 0; i < worker_num; i++) { workThrads[i].stopWorker(); workThrads[i] = null; } threadPool=null; taskQueue.clear();// 清空任務(wù)隊(duì)列 } // 返回工作線程的個(gè)數(shù) public int getWorkThreadNumber() { return worker_num; } // 返回已完成任務(wù)的個(gè)數(shù),這里的已完成是只出了任務(wù)隊(duì)列的任務(wù)個(gè)數(shù),可能該任務(wù)并沒(méi)有實(shí)際執(zhí)行完成 public int getFinishedTasknumber() { return finished_task; } // 返回任務(wù)隊(duì)列的長(zhǎng)度,即還沒(méi)處理的任務(wù)個(gè)數(shù) public int getWaitTasknumber() { return taskQueue.size(); } // 覆蓋toString方法,返回線程池信息:工作線程個(gè)數(shù)和已完成任務(wù)個(gè)數(shù) @Override public String toString() { return "WorkThread number:" + worker_num + " finished task number:" + finished_task + " wait task number:" + getWaitTasknumber(); } /** * 內(nèi)部類(lèi),工作線程 */ private class WorkThread extends Thread { // 該工作線程是否有效,用于結(jié)束該工作線程 private boolean isRunning = true; /* * 關(guān)鍵所在啊,如果任務(wù)隊(duì)列不空,則取出任務(wù)執(zhí)行,若任務(wù)隊(duì)列空,則等待 */ @Override public void run() { Runnable r = null; while (isRunning) {// 注意,若線程無(wú)效則自然結(jié)束run方法,該線程就沒(méi)用了 synchronized (taskQueue) { while (isRunning && taskQueue.isEmpty()) {// 隊(duì)列為空 try { taskQueue.wait(20); } catch (InterruptedException e) { e.printStackTrace(); } } if (!taskQueue.isEmpty()) r = taskQueue.remove(0);// 取出任務(wù) } if (r != null) { r.run();// 執(zhí)行任務(wù) } finished_task++; r = null; } } // 停止工作,讓該線程自然執(zhí)行完run方法,自然結(jié)束 public void stopWorker() { isRunning = false; } } }測(cè)試一下:
public class TestThreadPool { public static void main(String[] args) { // 創(chuàng)建3個(gè)線程的線程池 ThreadPool t = ThreadPool.getThreadPool(3); t.execute(new Runnable[] { new Task(), new Task(), new Task() }); t.execute(new Runnable[] { new Task(), new Task(), new Task() }); System.out.println(t); t.destroy();// 所有線程都執(zhí)行完成才destory System.out.println(t); } // 任務(wù)類(lèi) static class Task implements Runnable { private static volatile int i = 1; @Override public void run() {// 執(zhí)行任務(wù) System.out.println("任務(wù) " + (i++) + " 完成"); } } }運(yùn)行結(jié)果:
WorkThread number:3 finished task number:0 wait task number:6 任務(wù) 1 完成 任務(wù) 2 完成 任務(wù) 3 完成 任務(wù) 4 完成 任務(wù) 5 完成 任務(wù) 6 完成 WorkThread number:3 finished task number:6 wait task number:03、Java5提供的線程池
3.1 緩存線程池(newCachedThreadPool)
緩存線程池(newCachedThreadPool)可以創(chuàng)建任意個(gè)線程,每個(gè)任務(wù)過(guò)來(lái)后都會(huì)創(chuàng)建一個(gè)線程,用于任務(wù)少,或執(zhí)行時(shí)間短的任務(wù),例如我們創(chuàng)建十個(gè)任務(wù),那么緩沖線程池將會(huì)創(chuàng)建十個(gè)線程來(lái)執(zhí)行。如下代碼:
ExecutorService threadPool = Executors.newCachedThreadPool(); for(int i=1; i<=10; i++){final int taskId = i; threadPool.execute(new Runnable(){ public void run() { for(int i=1; i<=10; i++){ System.out.println(Thread.currentThread().getName() + " is looping of " + i + " the task is " + taskId); try { Thread.sleep(20); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); } System.out.println("add all of 10 task"); threadPool.shutdown();3.2 固定數(shù)量線程池(newFixedThreadPool)
固定數(shù)量線程池(newFixedThreadPool)允許我們創(chuàng)建固定線程數(shù)量的線程池,如果任務(wù)數(shù)大于線程池中線程的數(shù)量,那么任務(wù)將等待,如下代碼:
ExecutorService threadPool = Executors.newFixedThreadPool(3); for(int i=1; i<=10; i++){ final int taskId = i; threadPool.execute(new Runnable(){ public void run() { for(int i=1; i<=10; i++){ System.out.println(Thread.currentThread().getName() + " is looping of " + i + " the task is " + taskId); try { Thread.sleep(20); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); } System.out.println("add all of 10 task"); threadPool.shutdown();3.4 單一線程池(newSingleThreadExecutor)
如何實(shí)現(xiàn)線程掛掉后重新啟動(dòng)?創(chuàng)建單一的線程池。
newSingleThreadExecutor(),這樣線程池中只會(huì)有一個(gè)線程工作,當(dāng)線程失敗后會(huì)重新創(chuàng)建一個(gè)線程將失敗的線程替換掉。
3.5 定時(shí)器線程池(scheduleAtFixedRate)
定時(shí)器線程池(scheduleAtFixedRate)與定時(shí)器很類(lèi)似,可以指定線程池中線程在多長(zhǎng)時(shí)間后執(zhí)行,以及每個(gè)多長(zhǎng)時(shí)間執(zhí)行一次,代碼如下,可以模擬讓炸彈在6s后爆炸,并且每個(gè)2s炸一次:
Executors.newScheduledThreadPool(3).scheduleAtFixedRate( // .schedule( new Runnable(){ public void run() { System.out.println("boming"); } }, 6, 2, TimeUnit.SECONDS); }總結(jié)
以上是生活随笔為你收集整理的【Java线程】线程池的原理和实现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 【定时任务】Quartz用法详解
- 下一篇: 【Java并发编程】并发之痛 Threa