【Linux系统编程】线程池
00. 目錄
文章目錄
- 00. 目錄
- 01. 線程池原理
- 02. 線程池應用實例
- 03. 線程池代碼
- 04. 附錄
01. 線程池原理
在傳統服務器結構中,常是有一個總的監聽線程監聽有沒有新的用戶連接服務器,每當有一個新的用戶進入,服務器就開啟一個新的線程用戶處理這 個用戶的數據包。這個線程只服務于這個用戶,當用戶與服務器端關閉連接以后,服務器端銷毀這個線程。
然而頻繁地開辟與銷毀線程極大地占用了系統的資源,而且在大量用戶的情況下,系統為了開辟和銷毀線程將浪費大量的時間和資源。線程池提供了一個解決外部大量用戶與服務器有限資源的矛盾。
線程池和傳統的一個用戶對應一個線程的處理方法不同,它的基本思想就是在程序開始時就在內存中開辟一些線程,線程的數目是固定的,他們獨自形成一個類,屏蔽了對外的操作,而服務器只需要將數據包交給線程池就可以了。當有新的客戶請求到達時,不是新創建一個線程為其服務,而是從“池子”中選擇一個空閑的線程為新的客戶請求服務,服務完畢后,線程進入空閑線程池中。如果沒有線程空閑的話,就將數據包暫時積累, 等待線程池內有線程空閑以后再進行處理。通過對多個任務重用已經存在的線程對象,降低了對線程對象創建和銷毀的開銷。當客戶請求 時,線程對象已經存在,可以提高請求的響應時間,從而整體地提高了系統服務的表現。
02. 線程池應用實例
一般來說實現一個線程池主要包括以下幾個組成部分:
1)線程管理器:用于創建并管理線程池。
2)工作線程:線程池中實際執行任務的線程。在初始化線程時會預先創建好固定數目的線程在池中,這些初始化的線程一般處于空閑狀態,一般不占用 CPU,占用較小的內存空間。
3)任務接口:每個任務必須實現的接口,當線程池的任務隊列中有可執行任務時,被空閑的工作線程調去執行(線程的閑與忙是通過互斥量實現的),把任務抽象出來形成接口,可以做到線程池與具體的任務無關。
4)任務隊列:用來存放沒有處理的任務,提供一種緩沖機制,實現這種結構有好幾種方法,常用的是隊列,主要運用先進先出原理,另外一種是鏈表之類的數據結構,可以動態的為它分配內存空間,應用中比較靈活,此教程就是用到的鏈表。
**什么時候需要創建線程池呢?**簡單的說,如果一個應用需要頻繁的創建和銷毀線程,而任務執行的時間又非常短,這樣線程創建和銷毀的帶來的開銷就不容忽視,這時也是線程池該出場的機會了。如果線程創建和銷毀時間相比任務執行時間可以忽略不計,則沒有必要使用線程池了。
03. 線程池代碼
thread_pool.h的示例代碼
#ifndef __THREAD_POOL_H__ #define __THREAD_POOL_H__#include <pthread.h>/********************************************************************* * 任務回調函數,也可根據需要自行修改 *********************************************************************/ typedef void *(*pool_task_f)(void *arg);/********************************************************************* * 任務句柄 *********************************************************************/ typedef struct _task{pool_task_f process;/*回調函數,任務運行時會調用此函數,注意也可聲明成其它形式*/void *arg; /*回調函數的參數*/struct _task *next; }pool_task;/********************************************************************* * 線程池句柄 *********************************************************************/ typedef struct {pthread_t *threadid; /* 線程號 */int threads_limit; /* 線程池中允許的活動線程數目 */int destroy_flag; /* 是否銷毀線程池 , 0銷毀,1不銷毀*/pool_task *queue_head; /* 鏈表結構,線程池中所有等待任務 */int task_in_queue; /* 當前等待隊列的任務數目 */pthread_mutex_t queue_lock; /* 鎖 */pthread_cond_t queue_ready; /* 條件變量 */ }pool_t;/********************************************************************* *功能: 初始化線程池結構體并創建線程 *參數: pool:線程池句柄threads_limit:線程池中線程的數量 *返回值: 無 *********************************************************************/ void pool_init(pool_t *pool, int threads_limit);/********************************************************************* *功能: 銷毀線程池,等待隊列中的任務不會再被執行,但是正在運行的線程會一直,把任務運行完后再退出 *參數: 線程池句柄 *返回值: 成功:0,失敗非0 *********************************************************************/ int pool_uninit(pool_t *pool);/********************************************************************* *功能: 向線程池中添加一個任務 *參數: pool:線程池句柄process:任務處理函數arg:任務參數 *返回值: 0 *********************************************************************/ int pool_add_task(pool_t *pool, pool_task_f process, void *arg);#endifthread_pool.c 的示例代碼
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <assert.h>#include "thread_pool.h"static void *pool_thread_server(void *arg);/********************************************************************* *功能: 初始化線程池結構體并創建線程 *參數: pool:線程池句柄threads_limit:線程池中線程的數量 *返回值: 無 *********************************************************************/ void pool_init(pool_t *pool, int threads_limit) {pool->threads_limit = threads_limit;pool->queue_head = NULL;pool->task_in_queue = 0;pool->destroy_flag = 0;/*創建存放線程ID的空間*/pool->threadid = (pthread_t *)calloc(threads_limit, sizeof(pthread_t));int i = 0;/*初始化互斥鎖和條件變量*/pthread_mutex_init(&(pool->queue_lock), NULL);pthread_cond_init(&(pool->queue_ready), NULL);/*循環創建threads_limit個線程*/for (i = 0; i < threads_limit; i++){pthread_create(&(pool->threadid[i]), NULL, pool_thread_server, pool);}return; }/********************************************************************* *功能: 銷毀線程池,等待隊列中的任務不會再被執行,但是正在運行的線程會一直,把任務運行完后再退出 *參數: 線程池句柄 *返回值: 成功:0,失敗非0 *********************************************************************/ int pool_uninit(pool_t *pool) {pool_task *head = NULL;int i;pthread_mutex_lock(&(pool->queue_lock));if(pool->destroy_flag)/* 防止兩次調用 */return -1;pool->destroy_flag = 1;pthread_mutex_unlock(&(pool->queue_lock));/* 喚醒所有等待線程,線程池要銷毀了 */pthread_cond_broadcast(&(pool->queue_ready));/* 阻塞等待線程退出,否則就成僵尸了 */for (i = 0; i < pool->threads_limit; i++)pthread_join(pool->threadid[i], NULL);free(pool->threadid);/* 銷毀等待隊列 */pthread_mutex_lock(&(pool->queue_lock));while(pool->queue_head != NULL){head = pool->queue_head;pool->queue_head = pool->queue_head->next;free(head);}pthread_mutex_unlock(&(pool->queue_lock));/*條件變量和互斥量也別忘了銷毀*/pthread_mutex_destroy(&(pool->queue_lock));pthread_cond_destroy(&(pool->queue_ready));return 0; }/********************************************************************* *功能: 向任務隊列中添加一個任務 *參數: pool:線程池句柄process:任務處理函數arg:任務參數 *返回值: 無 *********************************************************************/ static void enqueue_task(pool_t *pool, pool_task_f process, void *arg) {pool_task *task = NULL;pool_task *member = NULL;pthread_mutex_lock(&(pool->queue_lock));if(pool->task_in_queue >= pool->threads_limit){printf("task_in_queue > threads_limit!\n");pthread_mutex_unlock (&(pool->queue_lock));return;}task = (pool_task *)calloc(1, sizeof(pool_task));assert(task != NULL);task->process = process;task->arg = arg;task->next = NULL;pool->task_in_queue++;member = pool->queue_head;if(member != NULL){while(member->next != NULL) /* 將任務加入到任務鏈連的最后位置. */member = member->next;member->next = task;}else{pool->queue_head = task; /* 如果是第一個任務的話,就指向頭 */}printf("\ttasks %d\n", pool->task_in_queue);/* 等待隊列中有任務了,喚醒一個等待線程 */pthread_cond_signal (&(pool->queue_ready));pthread_mutex_unlock (&(pool->queue_lock)); }/********************************************************************* *功能: 從任務隊列中取出一個任務 *參數: 線程池句柄 *返回值: 任務句柄 *********************************************************************/ static pool_task *dequeue_task(pool_t *pool) {pool_task *task = NULL;pthread_mutex_lock(&(pool->queue_lock));/* 判斷線程池是否要銷毀了 */if(pool->destroy_flag){pthread_mutex_unlock(&(pool->queue_lock));printf("thread 0x%lx will be destroyed\n", pthread_self());pthread_exit(NULL);}/* 如果等待隊列為0并且不銷毀線程池,則處于阻塞狀態 */if(pool->task_in_queue == 0){while((pool->task_in_queue == 0) && (!pool->destroy_flag)){printf("thread 0x%lx is waitting\n", pthread_self());/* 注意:pthread_cond_wait是一個原子操作,等待前會解鎖,喚醒后會加鎖 */pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));}}else{/* 等待隊列長度減去1,并取出隊列中的第一個元素 */pool->task_in_queue--;task = pool->queue_head;pool->queue_head = task->next;printf("thread 0x%lx received a task\n", pthread_self());}pthread_mutex_unlock(&(pool->queue_lock));return task; }/********************************************************************* *功能: 向線程池中添加一個任務 *參數: pool:線程池句柄process:任務處理函數arg:任務參數 *返回值: 0 *********************************************************************/ int pool_add_task(pool_t *pool, pool_task_f process, void *arg) {enqueue_task(pool, process, arg);return 0; }/********************************************************************* *功能: 線程池服務程序 *參數: 略 *返回值: 略 *********************************************************************/ static void *pool_thread_server(void *arg) {pool_t *pool = NULL;pool = (pool_t *)arg;while(1){pool_task *task = NULL;task = dequeue_task(pool);/*調用回調函數,執行任務*/if(task != NULL){printf ("thread 0x%lx is busy\n", pthread_self());task->process(task->arg);free(task);task = NULL;}}/*這一句應該是不可達的*/pthread_exit(NULL); return NULL; }測試代碼:
#include <stdio.h> #include <unistd.h>#include "thread_pool.h"void *task_test(void *arg) {printf("\t\tworking on task %d\n", (int)arg);sleep(1); /*休息一秒,延長任務的執行時間*/return NULL; }void thread_pool_demo(void) {pool_t pool;int i = 0;pool_init(&pool, 2);//初始化一個線程池,其中創建2個線程sleep(1);for(i = 0; i < 5; i++){sleep(1);pool_add_task(&pool, task_test, (void *)i);//添加一個任務}sleep(4);pool_uninit(&pool);//刪除線程池 }int main (int argc, char *argv[]) { thread_pool_demo();return 0; }04. 附錄
總結
以上是生活随笔為你收集整理的【Linux系统编程】线程池的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Linux系统编程】线程私有数据
- 下一篇: 【Linux系统编程】同步和互斥的概念