日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > c/c++ >内容正文

c/c++

C++线程池原理及创建(转)

發(fā)布時(shí)間:2024/4/11 c/c++ 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 C++线程池原理及创建(转) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

C++線程池原理及創(chuàng)建(轉(zhuǎn))

來自http://www.cnblogs.com/cpper-kaixuan/p/3640485.html

? ? ? 本文給出了一個(gè)通用的線程池框架,該框架將與線程執(zhí)行相關(guān)的任務(wù)進(jìn)行了高層次的抽象,使之與具體的執(zhí)行任務(wù)無關(guān)。另外該線程池具有動(dòng)態(tài)伸縮性,它能根據(jù)執(zhí)行任務(wù)的輕重自動(dòng)調(diào)整線程池中線程的數(shù)量。文章的最后,我們給出一個(gè)簡(jiǎn)單示例程序,通過該示例程序,我們會(huì)發(fā)現(xiàn),通過該線程池框架執(zhí)行多線程任務(wù)是多么的簡(jiǎn)單。

為什么需要線程池
? ? ? 目前的大多數(shù)網(wǎng)絡(luò)服務(wù)器,包括Web服務(wù)器、Email服務(wù)器以及數(shù)據(jù)庫服務(wù)器等都具有一個(gè)共同點(diǎn),就是單位時(shí)間內(nèi)必須處理數(shù)目巨大的連接請(qǐng)求,但處理時(shí)間卻相對(duì)較短。
傳統(tǒng)多線程方案中我們采用的服務(wù)器模型則是一旦接受到請(qǐng)求之后,即創(chuàng)建一個(gè)新的線程,由該線程執(zhí)行任務(wù)。任務(wù)執(zhí)行完畢后,線程退出,這就是是“即時(shí)創(chuàng)建,即時(shí)銷毀”的策略。盡管與創(chuàng)建進(jìn)程相比,創(chuàng)建線程的時(shí)間已經(jīng)大大的縮短,但是如果提交給線程的任務(wù)是執(zhí)行時(shí)間較短,而且執(zhí)行次數(shù)極其頻繁,那么服務(wù)器將處于不停的創(chuàng)建線程,銷毀線程的狀態(tài)。

我們將傳統(tǒng)方案中的線程執(zhí)行過程分為三個(gè)過程:T1、T2、T3。

T1:線程創(chuàng)建時(shí)間
T2:線程執(zhí)行時(shí)間,包括線程的同步等時(shí)間
T3:線程銷毀時(shí)間

那么我們可以看出,線程本身的開銷所占的比例為(T1+T3) / (T1+T2+T3)。如果線程執(zhí)行的時(shí)間很短的話,這比開銷可能占到20%-50%左右。如果任務(wù)執(zhí)行時(shí)間很頻繁的話,這筆開銷將是不可忽略的。

? ? ? 除此之外,線程池能夠減少創(chuàng)建的線程個(gè)數(shù)。通常線程池所允許的并發(fā)線程是有上界的,如果同時(shí)需要并發(fā)的線程數(shù)超過上界,那么一部分線程將會(huì)等待。而傳統(tǒng)方案中,如果同時(shí)請(qǐng)求數(shù)目為2000,那么最壞情況下,系統(tǒng)可能需要產(chǎn)生2000個(gè)線程。盡管這不是一個(gè)很大的數(shù)目,但是也有部分機(jī)器可能達(dá)不到這種要求。

? ? ? 因此線程池的出現(xiàn)正是著眼于減少線程池本身帶來的開銷。線程池采用預(yù)創(chuàng)建的技術(shù),在應(yīng)用程序啟動(dòng)之后,將立即創(chuàng)建一定數(shù)量的線程(N1),放入空閑隊(duì)列中。這些線程都是處于阻塞(Suspended)狀態(tài),不消耗CPU,但占用較小的內(nèi)存空間。當(dāng)任務(wù)到來后,緩沖池選擇一個(gè)空閑線程,把任務(wù)傳入此線程中運(yùn)行。當(dāng)N1個(gè)線程都在處理任務(wù)后,緩沖池自動(dòng)創(chuàng)建一定數(shù)量的新線程,用于處理更多的任務(wù)。在任務(wù)執(zhí)行完畢后線程也不退出,而是繼續(xù)保持在池中等待下一次的任務(wù)。當(dāng)系統(tǒng)比較空閑時(shí),大部分線程都一直處于暫停狀態(tài),線程池自動(dòng)銷毀一部分線程,回收系統(tǒng)資源。
? ? ? 基于這種預(yù)創(chuàng)建技術(shù),線程池將線程創(chuàng)建和銷毀本身所帶來的開銷分?jǐn)偟搅烁鱾€(gè)具體的任務(wù)上,執(zhí)行次數(shù)越多,每個(gè)任務(wù)所分擔(dān)到的線程本身開銷則越小,不過我們另外可能需要考慮進(jìn)去線程之間同步所帶來的開銷。

構(gòu)建線程池框架

一般線程池都必須具備下面幾個(gè)組成部分:
線程池管理器:用于創(chuàng)建并管理線程池
工作線程: 線程池中實(shí)際執(zhí)行的線程
任務(wù)接口: 盡管線程池大多數(shù)情況下是用來支持網(wǎng)絡(luò)服務(wù)器,但是我們將線程執(zhí)行的任務(wù)抽象出來,形成任務(wù)接口,從而是的線程池與具體的任務(wù)無關(guān)。
任務(wù)隊(duì)列:線程池的概念具體到實(shí)現(xiàn)則可能是隊(duì)列,鏈表之類的數(shù)據(jù)結(jié)構(gòu),其中保存執(zhí)行線程。

我們實(shí)現(xiàn)的通用線程池框架由五個(gè)重要部分組成CThreadManage,CThreadPool,CThread,CJob,CWorkerThread,除此之外框架中還包括線程同步使用的類CThreadMutex和CCondition。
CJob是所有的任務(wù)的基類,其提供一個(gè)接口Run,所有的任務(wù)類都必須從該類繼承,同時(shí)實(shí)現(xiàn)Run方法。該方法中實(shí)現(xiàn)具體的任務(wù)邏輯。
CThread是Linux中線程的包裝,其封裝了Linux線程最經(jīng)常使用的屬性和方法,它也是一個(gè)抽象類,是所有線程類的基類,具有一個(gè)接口Run。
CWorkerThread是實(shí)際被調(diào)度和執(zhí)行的線程類,其從CThread繼承而來,實(shí)現(xiàn)了CThread中的Run方法。
CThreadPool是線程池類,其負(fù)責(zé)保存線程,釋放線程以及調(diào)度線程。
CThreadManage是線程池與用戶的直接接口,其屏蔽了內(nèi)部的具體實(shí)現(xiàn)。
CThreadMutex用于線程之間的互斥。
CCondition則是條件變量的封裝,用于線程之間的同步。

?

線程池的時(shí)序很簡(jiǎn)單。CThreadManage直接跟客戶端打交道,其接受需要?jiǎng)?chuàng)建的線程初始個(gè)數(shù),并接受客戶端提交的任務(wù)。這兒的任務(wù)是具體的非抽象的任務(wù)。CThreadManage的內(nèi)部實(shí)際上調(diào)用的都是CThreadPool的相關(guān)操作。CThreadPool創(chuàng)建具體的線程,并把客戶端提交的任務(wù)分發(fā)給CWorkerThread,CWorkerThread實(shí)際執(zhí)行具體的任務(wù)。

理解系統(tǒng)組件

下面我們分開來了解系統(tǒng)中的各個(gè)組件。
CThreadManage
CThreadManage的功能非常簡(jiǎn)單,其提供最簡(jiǎn)單的方法,其類定義如下:

1 class CThreadManage 2 { 3 private: 4 CThreadPool* m_Pool; 5 int m_NumOfThread; 6 protected: 7 public: 8 void SetParallelNum(int num); 9 CThreadManage(); 10 CThreadManage(int num); 11 virtual ~CThreadManage(); 12 13 void Run(CJob* job,void* jobdata); 14 void TerminateAll(void); 15 };

其中m_Pool指向?qū)嶋H的線程池;m_NumOfThread是初始創(chuàng)建時(shí)候允許創(chuàng)建的并發(fā)的線程個(gè)數(shù)。另外Run和TerminateAll方法也非常簡(jiǎn)單,只是簡(jiǎn)單的調(diào)用CThreadPool的一些相關(guān)方法而已。其具體的實(shí)現(xiàn)如下:?

1 CThreadManage::CThreadManage(){ 2 m_NumOfThread = 10; 3 m_Pool = new CThreadPool(m_NumOfThread); 4 } 5 CThreadManage::CThreadManage(int num){ 6 m_NumOfThread = num; 7 m_Pool = new CThreadPool(m_NumOfThread); 8 } 9 CThreadManage::~CThreadManage(){ 10 if(NULL != m_Pool) 11 delete m_Pool; 12 } 13 void CThreadManage::SetParallelNum(int num){ 14 m_NumOfThread = num; 15 } 16 void CThreadManage::Run(CJob* job,void* jobdata){ 17 m_Pool->Run(job,jobdata); 18 } 19 void CThreadManage::TerminateAll(void){ 20 m_Pool->TerminateAll(); 21 }

CThread
CThread 類實(shí)現(xiàn)了對(duì)Linux中線程操作的封裝,它是所有線程的基類,也是一個(gè)抽象類,提供了一個(gè)抽象接口Run,所有的CThread都必須實(shí)現(xiàn)該Run方法。CThread的定義如下所示:

1 class CThread 2 { 3 private: 4 int m_ErrCode; 5 Semaphore m_ThreadSemaphore; //the inner semaphore, which is used to realize 6 unsigned long m_ThreadID; 7 bool m_Detach; //The thread is detached 8 bool m_CreateSuspended; //if suspend after creating 9 char* m_ThreadName; 10 ThreadState m_ThreadState; //the state of the thread 11 protected: 12 void SetErrcode(int errcode){m_ErrCode = errcode;} 13 static void* ThreadFunction(void*); 14 public: 15 CThread(); 16 CThread(bool createsuspended,bool detach); 17 virtual ~CThread(); 18 virtual void Run(void) = 0; 19 void SetThreadState(ThreadState state){m_ThreadState = state;} 20 21 bool Terminate(void); //Terminate the threa 22 bool Start(void); //Start to execute the thread 23 void Exit(void); 24 bool Wakeup(void); 25 26 ThreadState GetThreadState(void){return m_ThreadState;} 27 int GetLastError(void){return m_ErrCode;} 28 void SetThreadName(char* thrname){strcpy(m_ThreadName,thrname);} 29 char* GetThreadName(void){return m_ThreadName;} 30 int GetThreadID(void){return m_ThreadID;} 31 32 bool SetPriority(int priority); 33 int GetPriority(void); 34 int GetConcurrency(void); 35 void SetConcurrency(int num); 36 bool Detach(void); 37 bool Join(void); 38 bool Yield(void); 39 int Self(void); 40 };

線程的狀態(tài)可以分為四種,空閑、忙碌、掛起、終止(包括正常退出和非正常退出)。由于目前Linux線程庫不支持掛起操作,因此,我們的此處的掛起操作類似于暫停。如果線程創(chuàng)建后不想立即執(zhí)行任務(wù),那么我們可以將其“暫停”,如果需要運(yùn)行,則喚醒。有一點(diǎn)必須注意的是,一旦線程開始執(zhí)行任務(wù),將不能被掛起,其將一直執(zhí)行任務(wù)至完畢。
線程類的相關(guān)操作均十分簡(jiǎn)單。線程的執(zhí)行入口是從Start()函數(shù)開始,其將調(diào)用函數(shù)ThreadFunction,ThreadFunction再調(diào)用實(shí)際的Run函數(shù),執(zhí)行實(shí)際的任務(wù)。

CThreadPool
CThreadPool是線程的承載容器,一般可以將其實(shí)現(xiàn)為堆棧、單向隊(duì)列或者雙向隊(duì)列。在我們的系統(tǒng)中我們使用STL Vector對(duì)線程進(jìn)行保存。CThreadPool的實(shí)現(xiàn)代碼如下:

1 class CThreadPool 2 { 3 friend class CWorkerThread; 4 private: 5 unsigned int m_MaxNum; //the max thread num that can create at the same time 6 unsigned int m_AvailLow; //The min num of idle thread that shoule kept 7 unsigned int m_AvailHigh; //The max num of idle thread that kept at the same time 8 unsigned int m_AvailNum; //the normal thread num of idle num; 9 unsigned int m_InitNum; //Normal thread num; 10 protected: 11 CWorkerThread* GetIdleThread(void); 12 13 void AppendToIdleList(CWorkerThread* jobthread); 14 void MoveToBusyList(CWorkerThread* idlethread); 15 void MoveToIdleList(CWorkerThread* busythread); 16 17 void DeleteIdleThread(int num); 18 void CreateIdleThread(int num); 19 public: 20 CThreadMutex m_BusyMutex; //when visit busy list,use m_BusyMutex to lock and unlock 21 CThreadMutex m_IdleMutex; //when visit idle list,use m_IdleMutex to lock and unlock 22 CThreadMutex m_JobMutex; //when visit job list,use m_JobMutex to lock and unlock 23 CThreadMutex m_VarMutex; 24 25 CCondition m_BusyCond; //m_BusyCond is used to sync busy thread list 26 CCondition m_IdleCond; //m_IdleCond is used to sync idle thread list 27 CCondition m_IdleJobCond; //m_JobCond is used to sync job list 28 CCondition m_MaxNumCond; 29 30 vector<CWorkerThread*> m_ThreadList; 31 vector<CWorkerThread*> m_BusyList; //Thread List 32 vector<CWorkerThread*> m_IdleList; //Idle List 33 34 CThreadPool(); 35 CThreadPool(int initnum); 36 virtual ~CThreadPool(); 37 38 void SetMaxNum(int maxnum){m_MaxNum = maxnum;} 39 int GetMaxNum(void){return m_MaxNum;} 40 void SetAvailLowNum(int minnum){m_AvailLow = minnum;} 41 int GetAvailLowNum(void){return m_AvailLow;} 42 void SetAvailHighNum(int highnum){m_AvailHigh = highnum;} 43 int GetAvailHighNum(void){return m_AvailHigh;} 44 int GetActualAvailNum(void){return m_AvailNum;} 45 int GetAllNum(void){return m_ThreadList.size();} 46 int GetBusyNum(void){return m_BusyList.size();} 47 void SetInitNum(int initnum){m_InitNum = initnum;} 48 int GetInitNum(void){return m_InitNum;} 49 50 void TerminateAll(void); 51 void Run(CJob* job,void* jobdata); 52 }; 53 CThreadPool::CThreadPool() 54 { 55 m_MaxNum = 50; 56 m_AvailLow = 5; 57 m_InitNum=m_AvailNum = 10 ; 58 m_AvailHigh = 20; 59 60 m_BusyList.clear(); 61 m_IdleList.clear(); 62 for(int i=0;i<m_InitNum;i++){ 63 CWorkerThread* thr = new CWorkerThread(); 64 thr->SetThreadPool(this); 65 AppendToIdleList(thr); 66 thr->Start(); 67 } 68 } 69 70 CThreadPool::CThreadPool(int initnum) 71 { 72 assert(initnum>0 && initnum<=30); 73 m_MaxNum = 30; 74 m_AvailLow = initnum-10>0?initnum-10:3; 75 m_InitNum=m_AvailNum = initnum ; 76 m_AvailHigh = initnum+10; 77 78 m_BusyList.clear(); 79 m_IdleList.clear(); 80 for(int i=0;i<m_InitNum;i++){ 81 CWorkerThread* thr = new CWorkerThread(); 82 AppendToIdleList(thr); 83 thr->SetThreadPool(this); 84 thr->Start(); //begin the thread,the thread wait for job 85 } 86 } 87 88 CThreadPool::~CThreadPool() 89 { 90 TerminateAll(); 91 } 92 93 void CThreadPool::TerminateAll() 94 { 95 for(int i=0;i < m_ThreadList.size();i++) { 96 CWorkerThread* thr = m_ThreadList[i]; 97 thr->Join(); 98 } 99 return; 100 } 101 102 CWorkerThread* CThreadPool::GetIdleThread(void) 103 { 104 while(m_IdleList.size() ==0 ) 105 m_IdleCond.Wait(); 106 107 m_IdleMutex.Lock(); 108 if(m_IdleList.size() > 0 ) 109 { 110 CWorkerThread* thr = (CWorkerThread*)m_IdleList.front(); 111 printf("Get Idle thread %dn",thr->GetThreadID()); 112 m_IdleMutex.Unlock(); 113 return thr; 114 } 115 m_IdleMutex.Unlock(); 116 117 return NULL; 118 } 119 120 //add an idle thread to idle list 121 void CThreadPool::AppendToIdleList(CWorkerThread* jobthread) 122 { 123 m_IdleMutex.Lock(); 124 m_IdleList.push_back(jobthread); 125 m_ThreadList.push_back(jobthread); 126 m_IdleMutex.Unlock(); 127 } 128 129 //move and idle thread to busy thread 130 void CThreadPool::MoveToBusyList(CWorkerThread* idlethread) 131 { 132 m_BusyMutex.Lock(); 133 m_BusyList.push_back(idlethread); 134 m_AvailNum--; 135 m_BusyMutex.Unlock(); 136 137 m_IdleMutex.Lock(); 138 vector<CWorkerThread*>::iterator pos; 139 pos = find(m_IdleList.begin(),m_IdleList.end(),idlethread); 140 if(pos !=m_IdleList.end()) 141 m_IdleList.erase(pos); 142 m_IdleMutex.Unlock(); 143 } 144 145 void CThreadPool::MoveToIdleList(CWorkerThread* busythread) 146 { 147 m_IdleMutex.Lock(); 148 m_IdleList.push_back(busythread); 149 m_AvailNum++; 150 m_IdleMutex.Unlock(); 151 152 m_BusyMutex.Lock(); 153 vector<CWorkerThread*>::iterator pos; 154 pos = find(m_BusyList.begin(),m_BusyList.end(),busythread); 155 if(pos!=m_BusyList.end()) 156 m_BusyList.erase(pos); 157 m_BusyMutex.Unlock(); 158 159 m_IdleCond.Signal(); 160 m_MaxNumCond.Signal(); 161 } 162 163 //create num idle thread and put them to idlelist 164 void CThreadPool::CreateIdleThread(int num) 165 { 166 for(int i=0;i<num;i++){ 167 CWorkerThread* thr = new CWorkerThread(); 168 thr->SetThreadPool(this); 169 AppendToIdleList(thr); 170 m_VarMutex.Lock(); 171 m_AvailNum++; 172 m_VarMutex.Unlock(); 173 thr->Start(); //begin the thread,the thread wait for job 174 } 175 } 176 177 void CThreadPool::DeleteIdleThread(int num) 178 { 179 printf("Enter into CThreadPool::DeleteIdleThreadn"); 180 m_IdleMutex.Lock(); 181 printf("Delete Num is %dn",num); 182 for(int i=0;i<num;i++){ 183 CWorkerThread* thr; 184 if(m_IdleList.size() > 0 ){ 185 thr = (CWorkerThread*)m_IdleList.front(); 186 printf("Get Idle thread %dn",thr->GetThreadID()); 187 } 188 189 vector<CWorkerThread*>::iterator pos; 190 pos = find(m_IdleList.begin(),m_IdleList.end(),thr); 191 if(pos!=m_IdleList.end()) 192 m_IdleList.erase(pos); 193 m_AvailNum--; 194 printf("The idle thread available num:%d n",m_AvailNum); 195 printf("The idlelist num:%d n",m_IdleList.size()); 196 } 197 m_IdleMutex.Unlock(); 198 } 199 void CThreadPool::Run(CJob* job,void* jobdata) 200 { 201 assert(job!=NULL); 202 203 //if the busy thread num adds to m_MaxNum,so we should wait 204 if(GetBusyNum() == m_MaxNum) 205 m_MaxNumCond.Wait(); 206 207 if(m_IdleList.size()<m_AvailLow) 208 { 209 if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum ) 210 CreateIdleThread(m_InitNum-m_IdleList.size()); 211 else 212 CreateIdleThread(m_MaxNum-GetAllNum()); 213 } 214 215 CWorkerThread* idlethr = GetIdleThread(); 216 if(idlethr !=NULL) 217 { 218 idlethr->m_WorkMutex.Lock(); 219 MoveToBusyList(idlethr); 220 idlethr->SetThreadPool(this); 221 job->SetWorkThread(idlethr); 222 printf("Job is set to thread %d n",idlethr->GetThreadID()); 223 idlethr->SetJob(job,jobdata); 224 } 225 }

在CThreadPool中存在兩個(gè)鏈表,一個(gè)是空閑鏈表,一個(gè)是忙碌鏈表。Idle鏈表中存放所有的空閑進(jìn)程,當(dāng)線程執(zhí)行任務(wù)時(shí)候,其狀態(tài)變?yōu)槊β禒顟B(tài),同時(shí)從空閑鏈表中刪除,并移至忙碌鏈表中。在CThreadPool的構(gòu)造函數(shù)中,我們將執(zhí)行下面的代碼:?

1 for(int i=0;i<m_InitNum;i++) 2 { 3 CWorkerThread* thr = new CWorkerThread(); 4 AppendToIdleList(thr); 5 thr->SetThreadPool(this); 6 thr->Start(); //begin the thread,the thread wait for job 7 }

在該代碼中,我們將創(chuàng)建m_InitNum個(gè)線程,創(chuàng)建之后即調(diào)用AppendToIdleList放入Idle鏈表中,由于目前沒有任務(wù)分發(fā)給這些線程,因此線程執(zhí)行Start后將自己掛起。
事實(shí)上,線程池中容納的線程數(shù)目并不是一成不變的,其會(huì)根據(jù)執(zhí)行負(fù)載進(jìn)行自動(dòng)伸縮。為此在CThreadPool中設(shè)定四個(gè)變量:
m_InitNum:處世創(chuàng)建時(shí)線程池中的線程的個(gè)數(shù)。
m_MaxNum:當(dāng)前線程池中所允許并發(fā)存在的線程的最大數(shù)目。
m_AvailLow:當(dāng)前線程池中所允許存在的空閑線程的最小數(shù)目,如果空閑數(shù)目低于該值,表明負(fù)載可能過重,此時(shí)有必要增加空閑線程池的數(shù)目。實(shí)現(xiàn)中我們總是將線程調(diào)整為m_InitNum個(gè)。
m_AvailHigh:當(dāng)前線程池中所允許的空閑的線程的最大數(shù)目,如果空閑數(shù)目高于該值,表明當(dāng)前負(fù)載可能較輕,此時(shí)將刪除多余的空閑線程,刪除后調(diào)整數(shù)也為m_InitNum個(gè)。
m_AvailNum:目前線程池中實(shí)際存在的線程的個(gè)數(shù),其值介于m_AvailHigh和m_AvailLow之間。如果線程的個(gè)數(shù)始終維持在m_AvailLow和m_AvailHigh之間,則線程既不需要?jiǎng)?chuàng)建,也不需要?jiǎng)h除,保持平衡狀態(tài)。因此如何設(shè)定m_AvailLow和m_AvailHigh的值,使得線程池最大可能的保持平衡態(tài),是線程池設(shè)計(jì)必須考慮的問題。
線程池在接受到新的任務(wù)之后,線程池首先要檢查是否有足夠的空閑池可用。檢查分為三個(gè)步驟:
? ? ? (1)檢查當(dāng)前處于忙碌狀態(tài)的線程是否達(dá)到了設(shè)定的最大值m_MaxNum,如果達(dá)到了,表明目前沒有空閑線程可用,而且也不能創(chuàng)建新的線程,因此必須等待直到有線程執(zhí)行完畢返回到空閑隊(duì)列中。
? ? ? (2)如果當(dāng)前的空閑線程數(shù)目小于我們?cè)O(shè)定的最小的空閑數(shù)目m_AvailLow,則我們必須創(chuàng)建新的線程,默認(rèn)情況下,創(chuàng)建后的線程數(shù)目應(yīng)該為m_InitNum,因此創(chuàng)建的線程數(shù)目應(yīng)該為( 當(dāng)前空閑線程數(shù)與m_InitNum);但是有一種特殊情況必須考慮,就是現(xiàn)有的線程總數(shù)加上創(chuàng)建后的線程數(shù)可能超過m_MaxNum,因此我們必須對(duì)線程的創(chuàng)建區(qū)別對(duì)待。

1 if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum ) 2 CreateIdleThread(m_InitNum-m_IdleList.size()); 3 else 4 CreateIdleThread(m_MaxNum-GetAllNum());

如果創(chuàng)建后總數(shù)不超過m_MaxNum,則創(chuàng)建后的線程為m_InitNum;如果超過了,則只創(chuàng)建( m_MaxNum-當(dāng)前線程總數(shù) )個(gè)。
? ? ? (3)調(diào)用GetIdleThread方法查找空閑線程。如果當(dāng)前沒有空閑線程,則掛起;否則將任務(wù)指派給該線程,同時(shí)將其移入忙碌隊(duì)列。
當(dāng)線程執(zhí)行完畢后,其會(huì)調(diào)用MoveToIdleList方法移入空閑鏈表中,其中還調(diào)用m_IdleCond.Signal()方法,喚醒GetIdleThread()中可能阻塞的線程。

CWorkerThread
CWorkerThread是CThread的派生類,是事實(shí)上的工作線程。在CThreadPool的構(gòu)造函數(shù)中,我們創(chuàng)建了一定數(shù)量的CWorkerThread。一旦這些線程創(chuàng)建完畢,我們將調(diào)用Start()啟動(dòng)該線程。Start方法最終會(huì)調(diào)用Run方法。Run方法是個(gè)無限循環(huán)的過程。在沒有接受到實(shí)際的任務(wù)的時(shí)候,m_Job為NULL,此時(shí)線程將調(diào)用Wait方法進(jìn)行等待,從而處于掛起狀態(tài)。一旦線程池將具體的任務(wù)分發(fā)給該線程,其將被喚醒,從而通知線程從掛起的地方繼續(xù)執(zhí)行。CWorkerThread的完整定義如下:

1 class CWorkerThread:public CThread 2 { 3 private: 4 CThreadPool* m_ThreadPool; 5 CJob* m_Job; 6 void* m_JobData; 7 8 CThreadMutex m_VarMutex; 9 bool m_IsEnd; 10 protected: 11 public: 12 CCondition m_JobCond; 13 CThreadMutex m_WorkMutex; 14 CWorkerThread(); 15 virtual ~CWorkerThread(); 16 void Run(); 17 void SetJob(CJob* job,void* jobdata); 18 CJob* GetJob(void){return m_Job;} 19 void SetThreadPool(CThreadPool* thrpool); 20 CThreadPool* GetThreadPool(void){return m_ThreadPool;} 21 }; 22 CWorkerThread::CWorkerThread() 23 { 24 m_Job = NULL; 25 m_JobData = NULL; 26 m_ThreadPool = NULL; 27 m_IsEnd = false; 28 } 29 CWorkerThread::~CWorkerThread() 30 { 31 if(NULL != m_Job) 32 delete m_Job; 33 if(m_ThreadPool != NULL) 34 delete m_ThreadPool; 35 } 36 37 void CWorkerThread::Run() 38 { 39 SetThreadState(THREAD_RUNNING); 40 for(;;) 41 { 42 while(m_Job == NULL) 43 m_JobCond.Wait(); 44 45 m_Job->Run(m_JobData); 46 m_Job->SetWorkThread(NULL); 47 m_Job = NULL; 48 m_ThreadPool->MoveToIdleList(this); 49 if(m_ThreadPool->m_IdleList.size() > m_ThreadPool->GetAvailHighNum()) 50 { 51 m_ThreadPool->DeleteIdleThread(m_ThreadPool->m_IdleList.size()-m_T 52 hreadPool->GetInitNum()); 53 } 54 m_WorkMutex.Unlock(); 55 } 56 } 57 void CWorkerThread::SetJob(CJob* job,void* jobdata) 58 { 59 m_VarMutex.Lock(); 60 m_Job = job; 61 m_JobData = jobdata; 62 job->SetWorkThread(this); 63 m_VarMutex.Unlock(); 64 m_JobCond.Signal(); 65 } 66 void CWorkerThread::SetThreadPool(CThreadPool* thrpool) 67 { 68 m_VarMutex.Lock(); 69 m_ThreadPool = thrpool; 70 m_VarMutex.Unlock(); 71 }

? ? ? 當(dāng)線程執(zhí)行任務(wù)之前首先必須判斷空閑線程的數(shù)目是否低于m_AvailLow,如果低于,則必須創(chuàng)建足夠的空閑線程,使其數(shù)目達(dá)到m_InitNum個(gè),然后將調(diào)用MoveToBusyList()移出空閑隊(duì)列,移入忙碌隊(duì)列。當(dāng)任務(wù)執(zhí)行完畢后,其又調(diào)用MoveToIdleList()移出忙碌隊(duì)列,移入空閑隊(duì)列,等待新的任務(wù)。
? ? ? 除了Run方法之外,CWorkerThread中另外一個(gè)重要的方法就是SetJob,該方法將實(shí)際的任務(wù)賦值給線程。當(dāng)沒有任何執(zhí)行任務(wù)即m_Job為NULL的時(shí)候,線程將調(diào)用m_JobCond.Wait進(jìn)行等待。一旦Job被賦值給線程,其將調(diào)用m_JobCond.Signal方法喚醒該線程。由于m_JobCond屬于線程內(nèi)部的變量,每個(gè)線程都維持一個(gè)m_JobCond,只有得到任務(wù)的線程才被喚醒,沒有得到任務(wù)的將繼續(xù)等待。無論一個(gè)線程何時(shí)被喚醒,其都將從等待的地方繼續(xù)執(zhí)行m_Job->Run(m_JobData),這是線程執(zhí)行實(shí)際任務(wù)的地方。
? ? ? 在線程執(zhí)行給定Job期間,我們必須防止另外一個(gè)Job又賦給該線程,因此在賦值之前,通過m_VarMutex進(jìn)行鎖定, Job執(zhí)行期間,其于的Job將不能關(guān)聯(lián)到該線程;任務(wù)執(zhí)行完畢,我們調(diào)用m_VarMutex.Unlock()進(jìn)行解鎖,此時(shí),線程又可以接受新的執(zhí)行任務(wù)。
在線程執(zhí)行任務(wù)結(jié)束后返回空閑隊(duì)列前,我們還需要判斷當(dāng)前空閑隊(duì)列中的線程是否高于m_AvailHigh個(gè)。如果超過m_AvailHigh,則必須從其中刪除(m_ThreadPool->m_IdleList.size()-m_ThreadPool->GetInitNum())個(gè)線程,使線程數(shù)目保持在m_InitNum個(gè)。

CJob
CJob類相對(duì)簡(jiǎn)單,其封裝了任務(wù)的基本的屬性和方法,其中最重要的是Run方法,代碼如下:

1 class CJob 2 { 3 private: 4 int m_JobNo; //The num was assigned to the job 5 char* m_JobName; //The job name 6 CThread *m_pWorkThread; //The thread associated with the job 7 public: 8 CJob( void ); 9 virtual ~CJob(); 10 11 int GetJobNo(void) const { return m_JobNo; } 12 void SetJobNo(int jobno){ m_JobNo = jobno;} 13 char* GetJobName(void) const { return m_JobName; } 14 void SetJobName(char* jobname); 15 CThread *GetWorkThread(void){ return m_pWorkThread; } 16 void SetWorkThread ( CThread *pWorkThread ){ 17 m_pWorkThread = pWorkThread; 18 } 19 virtual void Run ( void *ptr ) = 0; 20 }; 21 CJob::CJob(void) 22 :m_pWorkThread(NULL) 23 ,m_JobNo(0) 24 ,m_JobName(NULL) 25 { 26 } 27 CJob::~CJob(){ 28 if(NULL != m_JobName) 29 free(m_JobName); 30 } 31 void CJob::SetJobName(char* jobname) 32 { 33 if(NULL !=m_JobName) { 34 free(m_JobName); 35 m_JobName = NULL; 36 } 37 if(NULL !=jobname) { 38 m_JobName = (char*)malloc(strlen(jobname)+1); 39 strcpy(m_JobName,jobname); 40 } 41 }

線程池使用示例
至此我們給出了一個(gè)簡(jiǎn)單的與具體任務(wù)無關(guān)的線程池框架。使用該框架非常的簡(jiǎn)單,我們所需要的做的就是派生CJob類,將需要完成的任務(wù)實(shí)現(xiàn)在Run方法中。然后將該Job交由CThreadManage去執(zhí)行。下面我們給出一個(gè)簡(jiǎn)單的示例程序

1 class CXJob:public CJob 2 { 3 public: 4 CXJob(){i=0;} 5 ~CXJob(){} 6 void Run(void* jobdata) { 7 printf("The Job comes from CXJOB\n"); 8 sleep(2); 9 } 10 }; 11 12 class CYJob:public CJob 13 { 14 public: 15 CYJob(){i=0;} 16 ~CYJob(){} 17 void Run(void* jobdata) { 18 printf("The Job comes from CYJob\n"); 19 } 20 }; 21 22 main() 23 { 24 CThreadManage* manage = new CThreadManage(10); 25 for(int i=0;i<40;i++) 26 { 27 CXJob* job = new CXJob(); 28 manage->Run(job,NULL); 29 } 30 sleep(2); 31 CYJob* job = new CYJob(); 32 manage->Run(job,NULL); 33 manage->TerminateAll(); 34 }

CXJob和CYJob都是從Job類繼承而來,其都實(shí)現(xiàn)了Run接口。CXJob只是簡(jiǎn)單的打印一句”The Job comes from CXJob”,CYJob也只打印”The Job comes from CYJob”,然后均休眠2秒鐘。在主程序中我們初始創(chuàng)建10個(gè)工作線程。然后分別執(zhí)行40次CXJob和一次CYJob。

線程池使用后記

線程池適合場(chǎng)合:
事實(shí)上,線程池并不是萬能的。它有其特定的使用場(chǎng)合。線程池致力于減少線程本身的開銷對(duì)應(yīng)用所產(chǎn)生的影響,這是有前提的,前提就是線程本身開銷與線程執(zhí)行任務(wù)相比不可忽略。如果線程本身的開銷相對(duì)于線程任務(wù)執(zhí)行開銷而言是可以忽略不計(jì)的,那么此時(shí)線程池所帶來的好處是不明顯的,比如對(duì)于FTP服務(wù)器以及Telnet服務(wù)器,通常傳送文件的時(shí)間較長,開銷較大,那么此時(shí),我們采用線程池未必是理想的方法,我們可以選擇“即時(shí)創(chuàng)建,即時(shí)銷毀”的策略。
總之線程池通常適合下面的幾個(gè)場(chǎng)合:
(1) 單位時(shí)間內(nèi)處理任務(wù)頻繁而且任務(wù)處理時(shí)間短
(2) 對(duì)實(shí)時(shí)性要求較高。如果接受到任務(wù)后在創(chuàng)建線程,可能滿足不了實(shí)時(shí)要求,因此必須采用線程池進(jìn)行預(yù)創(chuàng)建。
(3) 必須經(jīng)常面對(duì)高突發(fā)性事件,比如Web服務(wù)器,如果有足球轉(zhuǎn)播,則服務(wù)器將產(chǎn)生巨大的沖擊。此時(shí)如果采取傳統(tǒng)方法,則必須不停的大量產(chǎn)生線程,銷毀線程。此時(shí)采用動(dòng)態(tài)線程池可以避免這種情況的發(fā)生。

結(jié)束語
本文給出了一個(gè)簡(jiǎn)單的通用的與任務(wù)無關(guān)的線程池的實(shí)現(xiàn),通過該線程池能夠極大的簡(jiǎn)化Linux下多線程的開發(fā)工作。該線程池的進(jìn)一步完善開發(fā)工作還在進(jìn)行中,希望能夠得到你的建議和支持。
參考資料
http://www-900.ibm.com/developerWorks/cn/java/j-jtp0730/index.shtml
POSIX多線程程序設(shè)計(jì),David R.Butenhof 譯者:于磊 曾剛,中國電力出版社
C++面向?qū)ο蠖嗑€程編程,CAMERON HUGHES等著 周良忠譯,人民郵電出版社
Java Pro,結(jié)合線程和分析器池,Edy Yu

總結(jié)

以上是生活随笔為你收集整理的C++线程池原理及创建(转)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。