Linux线程池的设计
我設(shè)計這個線程池的初衷是為了與socket對接的。線程池的實現(xiàn)千變?nèi)f化,我得這個并不一定是最好的,但卻是否和我心目中需求模型的。現(xiàn)把部分設(shè)計思路和代碼貼出,以期拋磚引玉。個人比較喜歡搞開源,所以大家如果覺得有什么需要改善的地方,歡迎給予評論。思前想后,也沒啥設(shè)計圖能表達(dá)出設(shè)計思想,就把類圖貼出來吧。
類圖設(shè)計如下:
Command類是我們的業(yè)務(wù)類。這個類里只能存放簡單的內(nèi)置類型,這樣方便與socket的直接傳輸。我定義了一個cmd_成員用于存放命令字,arg_用于存放業(yè)務(wù)的參數(shù)。這個參數(shù)可以使用分隔符來分隔各個參數(shù)。我設(shè)計的只是簡單實現(xiàn),如果有序列化操作了,完全不需要使用我這種方法啦。
ThreadProcess就是業(yè)務(wù)處理類,這里邊定義了各個方法用于進(jìn)行業(yè)務(wù)處理,它將在ThreadPool中的Process函數(shù)中調(diào)用。
ThreadPool就是我們的線程池類。其中的成員變量都是靜態(tài)變量,Process就是線程處理函數(shù)。
#define MAX_THREAD_NUM 50 // 該值目前需要設(shè)定為初始線程數(shù)的整數(shù)倍
#define ADD_FACTOR 40 // 該值表示一個線程可以處理的最大任務(wù)數(shù)
#define THREAD_NUM 10 // 初始線程數(shù)
bshutdown_:用于線程退出。
command_:用于存放任務(wù)隊列
command_cond_:條件變量
command_mutex_:互斥鎖
icurr_thread_num_:當(dāng)前線程池中的線程數(shù)
thread_id_map_:這個map用于存放線程對應(yīng)的其它信息,我只存放了線程的狀態(tài),0為正常,1為退出。還可以定義其它的結(jié)構(gòu)來存放更多的信息,例如存放套接字。
InitializeThreads:用于初始化線程池,先創(chuàng)建THREAD_NUM個線程。后期擴(kuò)容也需要這個函數(shù)。
Process:線程處理函數(shù),這里邊會調(diào)用AddThread和DeleteThread在進(jìn)行線程池的伸縮。
AddWork:往隊列中添加一個任務(wù)。
ThreadDestroy:線程銷毀函數(shù)。
AddThread:擴(kuò)容THREAD_NUM個線程
DeleteThread:如果任務(wù)隊列為空,則將原來的線程池恢復(fù)到THREAD_NUM個。這里可以根據(jù)需要進(jìn)行修改。
?
以下貼出代碼以供大家參考。
command.h
#ifndef COMMAND_H_ #define COMMAND_H_class Command { public:int get_cmd();char* get_arg();void set_cmd(int cmd);void set_arg(char* arg); private:int cmd_;char arg_[65]; };#endif /* COMMAND_H_ */command.cpp
#include <string.h> #include "command.h"int Command::get_cmd() {return cmd_; }char* Command::get_arg() {return arg_; }void Command::set_cmd(int cmd) {cmd_ = cmd; }void Command::set_arg(char* arg) {if(NULL == arg){return;}strncpy(arg_,arg,64);arg_[64] = '\0'; }thread_process.h
#ifndef THREAD_PROCESS_H_ #define THREAD_PROCESS_H_class ThreadProcess { public:void Process0(void* arg);void Process1(void* arg);void Process2(void* arg); };#endif /* THREAD_PROCESS_H_ */thread_process.cpp
#include <pthread.h> #include <stdio.h> #include <unistd.h> #include "thread_process.h"void ThreadProcess::Process0(void* arg) {printf("thread %u is starting process %s\n",pthread_self(),arg);usleep(100*1000); } void ThreadProcess::Process1(void* arg) {printf("thread %u is starting process %s\n",pthread_self(),arg);usleep(100*1000); }void ThreadProcess::Process2(void* arg) {printf("thread %u is starting process %s\n",pthread_self(),arg);usleep(100*1000); }thread_pool.h
#ifndef THREAD_POOL_H_ #define THREAD_POOL_H_#include <map> #include <vector> #include "command.h"#define MAX_THREAD_NUM 50 // 該值目前需要設(shè)定為初始線程數(shù)的整數(shù)倍 #define ADD_FACTOR 40 // 該值表示一個線程可以處理的最大任務(wù)數(shù) #define THREAD_NUM 10 // 初始線程數(shù)class ThreadPool { public:ThreadPool() {};static void InitializeThreads();void AddWork(Command command);void ThreadDestroy(int iwait = 2); private:static void* Process(void* arg);static void AddThread();static void DeleteThread();static bool bshutdown_;static int icurr_thread_num_;static std::map<pthread_t,int> thread_id_map_;static std::vector<Command> command_;static pthread_mutex_t command_mutex_;static pthread_cond_t command_cond_; };#endif /* THREAD_POOL_H_ */thread_pool.cpp
#include <pthread.h> #include <stdlib.h> #include "thread_pool.h" #include "thread_process.h" #include "command.h"bool ThreadPool::bshutdown_ = false; int ThreadPool::icurr_thread_num_ = THREAD_NUM; std::vector<Command> ThreadPool::command_; std::map<pthread_t,int> ThreadPool::thread_id_map_; pthread_mutex_t ThreadPool::command_mutex_ = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t ThreadPool::command_cond_ = PTHREAD_COND_INITIALIZER;void ThreadPool::InitializeThreads() {for (int i = 0; i < THREAD_NUM ; ++i){pthread_t tempThread;pthread_create(&tempThread, NULL, ThreadPool::Process, NULL);thread_id_map_[tempThread] = 0;} }void* ThreadPool::Process(void* arg) {ThreadProcess threadprocess;Command command;while (true){pthread_mutex_lock(&command_mutex_);// 如果線程需要退出,則此時退出if (1 == thread_id_map_[pthread_self()]){pthread_mutex_unlock(&command_mutex_);printf("thread %u will exit\n", pthread_self());pthread_exit(NULL);}// 當(dāng)線程不需要退出且沒有需要處理的任務(wù)時,需要縮容的則縮容,不需要的則等待信號if (0 == command_.size() && !bshutdown_){if(MAX_THREAD_NUM != THREAD_NUM){DeleteThread();if (1 == thread_id_map_[pthread_self()]){pthread_mutex_unlock(&command_mutex_);printf("thread %u will exit\n", pthread_self());pthread_exit(NULL);}}pthread_cond_wait(&command_cond_,&command_mutex_);}// 線程池需要關(guān)閉,關(guān)閉已有的鎖,線程退出if(bshutdown_){pthread_mutex_unlock (&command_mutex_);printf ("thread %u will exit\n", pthread_self ());pthread_exit (NULL);}// 如果線程池的最大線程數(shù)不等于初始線程數(shù),則表明需要擴(kuò)容if(MAX_THREAD_NUM != THREAD_NUM){AddThread();}// 從容器中取出待辦任務(wù)std::vector<Command>::iterator iter = command_.begin();command.set_arg(iter->get_arg());command.set_cmd(iter->get_cmd());command_.erase(iter);pthread_mutex_unlock(&command_mutex_);// 開始業(yè)務(wù)處理switch(command.get_cmd()){case 0:threadprocess.Process0(command.get_arg());break;case 1:threadprocess.Process1(command.get_arg());break;case 2:threadprocess.Process2(command.get_arg());break;default:break;}}return NULL; // 完全為了消除警告(eclipse編寫的代碼,警告很煩人) }void ThreadPool::AddWork(Command command) {bool bsignal = false;pthread_mutex_lock(&command_mutex_);if (0 == command_.size()){bsignal = true;}command_.push_back(command);pthread_mutex_unlock(&command_mutex_);if (bsignal){pthread_cond_signal(&command_cond_);} }void ThreadPool::ThreadDestroy(int iwait) {while(0 != command_.size()){sleep(abs(iwait));}bshutdown_ = true;pthread_cond_broadcast(&command_cond_);std::map<pthread_t,int>::iterator iter = thread_id_map_.begin();for (; iter!=thread_id_map_.end(); ++iter){pthread_join(iter->first,NULL);}pthread_mutex_destroy(&command_mutex_);pthread_cond_destroy(&command_cond_); }void ThreadPool::AddThread() {if(((icurr_thread_num_*ADD_FACTOR) < command_.size())&& (MAX_THREAD_NUM != icurr_thread_num_)){InitializeThreads();icurr_thread_num_ += THREAD_NUM;} }void ThreadPool::DeleteThread() {int size = icurr_thread_num_ - THREAD_NUM;std::map<pthread_t,int>::iterator iter = thread_id_map_.begin();for(int i=0; i<size; ++i,++iter){iter->second = 1;} }main.cpp
#include "thread_pool.h" #include "command.h"int main() {ThreadPool thread_pool;thread_pool.InitializeThreads();Command command;char arg[8] = {0};for(int i=1; i<=1000; ++i){command.set_cmd(i%3);sprintf(arg,"%d",i);command.set_arg(arg);thread_pool.AddWork(command);}sleep(10); // 用于測試線程池縮容 thread_pool.ThreadDestroy();return 0; }?
代碼是按照google的開源c++編碼規(guī)范編寫。大家可以通過改變那幾個宏的值來調(diào)整線程池。有問題大家一起討論。
總結(jié)
以上是生活随笔為你收集整理的Linux线程池的设计的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 我对STL的一些看法(二)认识vecto
- 下一篇: Redis:02---安装Redis(L