线程池的分析与实现
在需要頻繁開(kāi)線程時(shí),創(chuàng)建和銷毀線程會(huì)話費(fèi)大量時(shí)間,為了提高效率,我們可以在任務(wù)開(kāi)始前,先創(chuàng)建一定數(shù)量的線程。這樣在接收到任務(wù)時(shí),就可以直接使用線程池中處于wait狀態(tài)的線程,在任務(wù)結(jié)束后線程回到wait狀態(tài),等待新任務(wù)的到來(lái),這就避免了線程的創(chuàng)建與銷毀,從而提高程序執(zhí)行效率。
所需數(shù)據(jù)
- 需要存儲(chǔ)有多少線程( int thread_number )
- 需要開(kāi)辟對(duì)應(yīng)的數(shù)組,存儲(chǔ)線程號(hào)( pthread_t *threads )
- 需要一個(gè)任務(wù)隊(duì)列來(lái)存儲(chǔ)未執(zhí)行的任務(wù),便于線程競(jìng)爭(zhēng)任務(wù)并執(zhí)行( task_queue )
- 需要一個(gè)flag來(lái)標(biāo)記線程池是否結(jié)束,該標(biāo)記可以在線程池結(jié)束后喚醒所有處于等待線程的線程,讓它們可以正常退出(其中,所有線程處于脫離(detach)狀態(tài))
- 互斥鎖與條件變量,用于避免在獲取與添加任務(wù)時(shí)發(fā)生錯(cuò)誤(同步與互斥)
運(yùn)行流程
- 執(zhí)行ThreadPool的構(gòu)造函數(shù),初始化有關(guān)數(shù)據(jù),進(jìn)行線程的創(chuàng)建,并將線程進(jìn)行脫離(使線程在運(yùn)行完后可以自動(dòng)回收)
- 創(chuàng)建的線程會(huì)去執(zhí)行工作線程,如果任務(wù)隊(duì)列為空,則一直while循環(huán)等待,直到被喚醒(signal)去競(jìng)爭(zhēng)任務(wù)。若競(jìng)爭(zhēng)到任務(wù),則去執(zhí)行,執(zhí)行完后操作與前面相同;若沒(méi)有競(jìng)爭(zhēng)到任務(wù),則回到wait狀態(tài)
- 有新任務(wù)到來(lái),接收到信號(hào)的程序會(huì)給線程池的任務(wù)隊(duì)列添加信息,線程池便會(huì)喚醒處于wait的線程執(zhí)行任務(wù),或直接被剛執(zhí)行完任務(wù)的線程繼續(xù)執(zhí)行(由于是任務(wù)隊(duì)列,所以只有處于隊(duì)首的任務(wù)會(huì)被執(zhí)行)
代碼實(shí)現(xiàn)(c++,后面會(huì)有對(duì)代碼的分析)
源代碼請(qǐng)看我的Github
封裝線程池的類
// threadPool.h#ifndef _THREADPOOL_H_ #define _THREADPOOL_H_#include "locker.h" // 該頭文件見(jiàn)文章最后 或 Github #include <queue> using namespace std;template< typename T > class ThreadPool { private:int thread_number; // 線程池的線程數(shù)pthread_t *threads; // 線程數(shù)組queue<T *> task_queue; // 任務(wù)隊(duì)列MutexLocker queue_mutex_locker; // 任務(wù)隊(duì)列的互斥鎖Cond queue_cond_locker; // 任務(wù)隊(duì)列的條件變量bool m_stop; // 是否結(jié)束線程 public:ThreadPool( int thread_num = 20 );~ThreadPool();bool append( T *task ); // 向任務(wù)隊(duì)列添加任務(wù) private:static void *worker( void * ); // 工作線程void run(); // 線程池中線程開(kāi)始運(yùn)行的函數(shù)T *getTask(); // 從任務(wù)隊(duì)列中獲取隊(duì)首的任務(wù) };template< typename T > ThreadPool<T>::ThreadPool( int thread_num ) :thread_number(thread_num), threads(NULL), m_stop(false) {if( thread_number < 0 ) {cout << "thread_number < 0\n";throw exception();}// 創(chuàng)建數(shù)組存放線程號(hào)threads = new pthread_t[ thread_number ];if( !threads ) {cout << "threads is NULL\n";throw exception();}// 創(chuàng)建規(guī)定數(shù)量的線程for( int i = 0; i < thread_number; i++ ) {// 由于pthread_create第三個(gè)參數(shù)必須指向靜態(tài)函數(shù),要使用類成員函數(shù)和變量,只能通過(guò):// 1) 類的靜態(tài)對(duì)象// 2) 將類的對(duì)象作為參數(shù)傳給靜態(tài)函數(shù)// 這里通過(guò)第二種方法實(shí)現(xiàn)if( pthread_create( &threads[i], NULL, worker, this ) ) { // 成功返回0delete[] threads; // 創(chuàng)建失敗則釋放所有已分配的空間cout << "pthread_create error\n";throw exception();}// 將線程進(jìn)行脫離,線程運(yùn)行完后自動(dòng)回收,避免使用主線程進(jìn)行join等待其結(jié)束if( pthread_detach( threads[i] ) ) {delete[] threads;cout << "pthread_detach error\n";throw exception();}} }// 析構(gòu)函數(shù)中,將m_stop置true,此時(shí)將阻塞中的所有線程喚醒 // 由于 !m_stop 為false,線程會(huì)退出循環(huán),線程結(jié)束被回收( 詳見(jiàn)函數(shù)run() ) // 若不喚醒線程,則在程序退出后,線程非正常結(jié)束,同時(shí)會(huì)導(dǎo)致 template< typename T > ThreadPool<T>::~ThreadPool() {delete[] threads;m_stop = true;queue_cond_locker.broadcast(); }/* 添加任務(wù)時(shí)需要先上鎖,并判斷隊(duì)列是否為空 */ template< typename T > bool ThreadPool<T>::append( T *task ) {queue_mutex_locker.mutex_lock();bool need_signal = task_queue.empty(); // 記錄添加任務(wù)之前隊(duì)列是否為空task_queue.push( task );queue_mutex_locker.mutex_unlock();// 如果添加任務(wù)之前隊(duì)列為空,即所有線程都在wait,所以需要喚醒某個(gè)線程if( need_signal ) {queue_cond_locker.signal();}return true; }// 線程函數(shù),調(diào)用run()來(lái)使線程開(kāi)始工作 template< typename T > void * ThreadPool<T>::worker( void *arg ) {ThreadPool *pool = ( ThreadPool * )arg;pool->run();return pool; }// 獲取處于隊(duì)首的任務(wù),獲取時(shí)需要加鎖,避免發(fā)生錯(cuò)誤 // 若隊(duì)列為空,則返回NULL,該線程成為等待狀態(tài)(詳見(jiàn)函數(shù)run()) template< typename T > T* ThreadPool<T>::getTask() {T *task = NULL;queue_mutex_locker.mutex_lock();if( !task_queue.empty() ) {task = task_queue.front();task_queue.pop();}queue_mutex_locker.mutex_unlock();return task; }template< typename T > void ThreadPool<T>::run() {while( !m_stop ) { // 當(dāng)線程池沒(méi)有結(jié)束時(shí),線程循環(huán)獲取任務(wù)進(jìn)行執(zhí)行T *task = getTask();if( !task ) {queue_cond_locker.wait(); // 隊(duì)列為空,線程開(kāi)始等待} else {task->doit(); // 開(kāi)始執(zhí)行任務(wù)delete task; //task指向的對(duì)象在WebServer中new出來(lái),因此需要手動(dòng)delete}} }#endif代碼分析
首先需要注意,threadpool的實(shí)現(xiàn)使用了模板類,這樣就需要把類的定義與成員函數(shù)的實(shí)現(xiàn)放在一個(gè)文件里,因?yàn)槌蓡T函數(shù)不能單獨(dú)編譯。
關(guān)于m_stop的作用:可以發(fā)現(xiàn)m_stop的初值為false,這樣線程就可以一直循環(huán)等待執(zhí)行任務(wù)( 見(jiàn)ThreadPool::run()成員函數(shù) ),但是m_stop的值改變的位置在析構(gòu)函數(shù)中,線程池都退出了,改變m_stop的值還有什么用呢?其實(shí),在將m_stop的值改變后,又調(diào)用了broadcast()來(lái)喚醒所有線程,這樣所有被阻塞的線程就會(huì)開(kāi)始執(zhí)行,run()方法的循環(huán)條件是while( !m_stop ),這樣其循環(huán)會(huì)被破壞,進(jìn)而線程結(jié)束,被自動(dòng)回收。// PS:然而只要主線程(暫且就這樣叫了)退出( 不是通過(guò)pthread_exit()退出 ),那么所有線程就會(huì)被強(qiáng)制終止,同樣也不會(huì)正常退出,所以線程池析構(gòu)函數(shù)中喚醒所有線程并不會(huì)有什么作用,可能被喚醒的線程仍會(huì)在主線程結(jié)束前未執(zhí)行完,仍為非正常退出。可以通過(guò)sleep()來(lái)環(huán)節(jié),不過(guò)這樣仍不會(huì)對(duì)正在執(zhí)行任務(wù)的線程有很好的作用,因?yàn)槿蝿?wù)執(zhí)行時(shí)間未知。(可以考慮在析構(gòu)函數(shù)中使用join()等待線程結(jié)束 )
在任務(wù)隊(duì)列進(jìn)行添加任務(wù)、取任務(wù)、刪除任務(wù)時(shí),為了避免多個(gè)線程對(duì)任務(wù)隊(duì)列同時(shí)操作,需要在進(jìn)行修改時(shí)將任務(wù)隊(duì)列加鎖
由于pthread_create第三個(gè)參數(shù)必須指向靜態(tài)函數(shù),要使用類成員函數(shù)和變量,只能通過(guò):
1) 類的靜態(tài)對(duì)象
2) 將類的對(duì)象作為參數(shù)傳給靜態(tài)函數(shù)
這里通過(guò)第二種方法實(shí)現(xiàn),將對(duì)象本身作為參數(shù)傳到線程工作函數(shù),在線程函數(shù)中進(jìn)行類型強(qiáng)轉(zhuǎn)獲取調(diào)用對(duì)象中存儲(chǔ)的數(shù)據(jù)。在從任務(wù)隊(duì)列獲取任務(wù)時(shí):如果任務(wù)隊(duì)列非空,就加鎖獲得隊(duì)首任務(wù),再將隊(duì)首出隊(duì),將任務(wù)返回即可;如果任務(wù)隊(duì)列為空,即無(wú)法獲取任務(wù),就會(huì)返回NULL,那么run()函數(shù)對(duì)應(yīng)就會(huì)進(jìn)入else后的語(yǔ)句,將線程進(jìn)行阻塞,等待下一個(gè)任務(wù)的到來(lái)。
關(guān)于在run() 中 delete task;的說(shuō)明:由于所有任務(wù)都是webServer類中使用new創(chuàng)建的(詳見(jiàn)我的Github),因?yàn)槿绻贿m用new的話,在將任務(wù)加入隊(duì)列后,webServer.cpp中任務(wù)會(huì)出作用域,進(jìn)行析構(gòu),那么任務(wù)可能會(huì)在執(zhí)行完之前被析構(gòu),造成錯(cuò)誤,所以使用new創(chuàng)建。然而,new創(chuàng)建的對(duì)象不會(huì)自動(dòng)釋放,只能手動(dòng)delete,因此在任務(wù)結(jié)束后進(jìn)行delete,這樣才能使其資源釋放,同事可以調(diào)用Task的析構(gòu)函數(shù),關(guān)閉該任務(wù)對(duì)應(yīng)的連接。
附:頭文件locker.h
封裝互斥鎖與條件變量的類
// locker.h #ifndef _LOCKER_H_ #define _LOCKER_H_#include <iostream> #include <exception> #include <pthread.h> using namespace std;/* 線程鎖 */ class MutexLocker { private:pthread_mutex_t m_mutex; public:MutexLocker() { //初始化if( pthread_mutex_init( &m_mutex, NULL ) ) {cout << "mutex init errror __ 1\n";throw exception();}}~MutexLocker() {pthread_mutex_destroy( &m_mutex );}bool mutex_lock() {return pthread_mutex_lock( &m_mutex ) == 0;}bool mutex_unlock() {return pthread_mutex_unlock( &m_mutex );} };/* 條件變量 */ class Cond { private:pthread_mutex_t m_mutex;pthread_cond_t m_cond; public:Cond() {if( pthread_mutex_init( &m_mutex, NULL ) ) {throw exception();}if( pthread_cond_init( &m_cond, NULL ) ) {pthread_cond_destroy( &m_cond );throw exception();}}~Cond() {pthread_mutex_destroy( &m_mutex );pthread_cond_destroy( &m_cond );}// 等待條件變量,cond與mutex搭配使用,避免造成共享數(shù)據(jù)的混亂bool wait() {pthread_mutex_lock( &m_mutex );int ret = pthread_cond_wait( &m_cond, &m_mutex );pthread_mutex_unlock( &m_mutex );return ret == 0;}// 喚醒等待該條件變量的某個(gè)線程bool signal() {return pthread_cond_signal( &m_cond ) == 0;}// 喚醒所有等待該條件變量的線程bool broadcast() {return pthread_cond_broadcast( &m_cond ) == 0;} };#endif總結(jié)
- 上一篇: 移动单页面中有哪些好的办法判断所有图片加
- 下一篇: 基于epoll+threadpool的w