操作系统实验报告15:进程同步与互斥线程池
操作系統(tǒng)實(shí)驗(yàn)報(bào)告15
實(shí)驗(yàn)內(nèi)容
- 實(shí)驗(yàn)內(nèi)容:進(jìn)程同步。
- 內(nèi)容1:編譯運(yùn)行課件 Lecture18 例程代碼。
- Algorithms 18-1 ~ 18-9.
- 內(nèi)容2:在 Lab Week 13 的基礎(chǔ)上用信號(hào)量解決線程池分配的互斥問題。
- 編譯、運(yùn)行、測試用例。
- 提交新的設(shè)計(jì)報(bào)告
- 內(nèi)容1:編譯運(yùn)行課件 Lecture18 例程代碼。
實(shí)驗(yàn)環(huán)境
- 架構(gòu):Intel x86_64 (虛擬機(jī))
- 操作系統(tǒng):Ubuntu 20.04
- 匯編器:gas (GNU Assembler) in AT&T mode
- 編譯器:gcc
技術(shù)日志
內(nèi)容1:編譯運(yùn)行課件 Lecture18 例程代碼
實(shí)驗(yàn)內(nèi)容原理:
- Linux版本
- 在版本2.6之前,Linux為非搶占內(nèi)核,即使有一個(gè)更高優(yōu)先級(jí)的進(jìn)程能夠運(yùn)行,它也不能搶占在內(nèi)核模式下運(yùn)行的其它進(jìn)程。
- 版本2.6及更高版本,Linux內(nèi)核是完全可搶占的。這樣在內(nèi)核態(tài)下運(yùn)行的任務(wù)也能被搶占。
- Linux在內(nèi)核中提供了幾種不同的同步機(jī)制:
- __sync_fetch_類型
- 自旋鎖
- 互斥鎖
- 信號(hào)量
- 自旋鎖和信號(hào)量的讀者-寫者版本。
- 在單CPU系統(tǒng)上,自旋鎖被啟用和禁用內(nèi)核搶占取代。
gcc __sync_系列原子操作函數(shù)
// 將value加到*ptr上,結(jié)果更新到*ptr,并返回操作之前*ptr的值 type __sync_fetch_and_add (type *ptr, type value); // 從*ptr減去value,結(jié)果更新到*ptr,并返回操作之前*ptr的值 type __sync_fetch_and_sub (type *ptr, type value, ...) // 將*ptr與value相或,結(jié)果更新到*ptr, 并返回操作之前*ptr的值 type __sync_fetch_and_or (type *ptr, type value, ...) // 將*ptr與value相與,結(jié)果更新到*ptr,并返回操作之前*ptr的值 type __sync_fetch_and_and (type *ptr, type value, ...) // 將*ptr與value異或,結(jié)果更新到*ptr,并返回操作之前*ptr的值 type __sync_fetch_and_xor (type *ptr, type value, ...) // 將*ptr取反后,與value相與,結(jié)果更新到*ptr,并返回操作之前*ptr的值 type __sync_fetch_and_nand (type *ptr, type value, ...) // 將value加到*ptr上,結(jié)果更新到*ptr,并返回操作之后新*ptr的值 type __sync_add_and_fetch (type *ptr, type value, ...) // 從*ptr減去value,結(jié)果更新到*ptr,并返回操作之后新*ptr的值 type __sync_sub_and_fetch (type *ptr, type value, ...) // 將*ptr與value相或, 結(jié)果更新到*ptr,并返回操作之后新*ptr的值 type __sync_or_and_fetch (type *ptr, type value, ...) // 將*ptr與value相與,結(jié)果更新到*ptr,并返回操作之后新*ptr的值 type __sync_and_and_fetch (type *ptr, type value, ...) // 將*ptr與value異或,結(jié)果更新到*ptr,并返回操作之后新*ptr的值 type __sync_xor_and_fetch (type *ptr, type value, ...)// 將*ptr取反后,與value相與,結(jié)果更新到*ptr,并返回操作之后新*ptr的值 type __sync_nand_and_fetch (type *ptr, type value, ...) // 比較*ptr與oldval的值,如果兩者相等,則將newval更新到*ptr并返回true bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)// 比較*ptr與oldval的值,如果兩者相等,則將newval更新到*ptr并返回操作之前*ptr的值 type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...) // 發(fā)出完整內(nèi)存柵欄 __sync_synchronize (...) // 將value寫入ptr,對ptr加鎖,并返回操作之前ptr的值。 type __sync_lock_test_and_set (type ptr, type value, ...)// 將0寫入到ptr,并對*ptr解鎖。 void __sync_lock_release (type ptr, ...)其中type可以是類型uint8_t, unt16_t, uint32_t, unt64_t。
- 驗(yàn)證實(shí)驗(yàn)alg.18-1-syn-fetch-1.c
執(zhí)行程序命令:
gcc alg.18-1-syn-fetch-1.c ./a.out分析:
實(shí)現(xiàn)細(xì)節(jié)解釋:
一開始先讓變量i等于10,然后在同一條打印語句中打印函數(shù)__sync_fetch_and_add(&i, 20)的返回值和i的值,__sync_fetch_and_add(&i, 20)是無鎖化原子操作語句,實(shí)現(xiàn)的是先取值再加第二個(gè)參數(shù)即20的操作,返回操作前i的值,所以語句執(zhí)行后獲取到的值還是原來i的值,即是10,而在同一條打印語句中的i的值與函數(shù)__sync_fetch_and_add(&i, 20)無關(guān),所以i的值還是10,下一條語句還是打印i的值,此時(shí)已經(jīng)執(zhí)行完函數(shù)__sync_fetch_and_add(&i, 20),所以可以看到此時(shí)i的值為30。
接著繼續(xù)讓變量i等于10,然后在同一條打印語句中打印函數(shù)__sync_add_and_fetch(&i, 20)的返回值和i的值,__sync_add_and_fetch(&i, 20)是無鎖化原子操作語句,實(shí)現(xiàn)的是先加第二個(gè)參數(shù)即20再取值的操作,返回操作后i的值,所以語句執(zhí)行后獲取到的值還是加上20后i的值,即是30,而在同一條打印語句中的i的值與函數(shù)__sync_add_and_fetch(&i, 20)無關(guān),所以i的值還是10,下一條語句還是打印i的值,此時(shí)已經(jīng)執(zhí)行完函數(shù)__sync_add_and_fetch(&i, 20),所以可以看到此時(shí)i的值為30。
- 驗(yàn)證實(shí)驗(yàn)alg.18-1-syn-fetch-2.c
執(zhí)行程序命令:
gcc alg.18-1-syn-fetch-2.c -pthread ./a.out分析:
可以看到,每個(gè)線程在加1的時(shí)候,因?yàn)槭褂玫氖莀_sync_fetch_and_add()函數(shù),是原子化操作,所以沒有發(fā)生條件沖突而產(chǎn)生錯(cuò)誤的值,值為40*20000=800000,計(jì)算結(jié)果正確。
實(shí)現(xiàn)細(xì)節(jié)解釋:
一開始使用語句pthread_create(&ptid[i], NULL, &test_func, NULL)創(chuàng)建MAX_N即40個(gè)線程,每個(gè)線程的線程執(zhí)行函數(shù)都為:
void *test_func(void *arg) {for (int i = 0; i < 20000; ++i)__sync_fetch_and_add(&count, 1);/* count++; gave a wrong result */ return NULL; }線程執(zhí)行函數(shù)的作用為使用__sync_fetch_and_add(&count, 1)語句使全局靜態(tài)變量count加1加20000次。
回到主線程中,使用pthread_join(ptid[i], NULL)語句使主線程等待MAX_N即40個(gè)線程結(jié)束后再繼續(xù)運(yùn)行,最后打印count的值,值為40*20000=800000,因?yàn)開_sync_fetch_and_add()函數(shù)是原子化操作,避免了每個(gè)線程在count加1時(shí)發(fā)生條件沖突,這樣得到的結(jié)果也是無誤的,程序運(yùn)行正確。
- 驗(yàn)證實(shí)驗(yàn)alg.18-1-syn-fetch-3.cc
執(zhí)行程序命令:
gcc alg.18-1-syn-fetch-3.c -pthread ./a.out分析:
可以看到,每個(gè)線程在加1的時(shí)候,因?yàn)槭褂玫氖莄ount++語句,不是原子操作語句,所以產(chǎn)生了條件沖突而產(chǎn)生錯(cuò)誤的值,值不為40*20000=800000,而是694845, 計(jì)算結(jié)果錯(cuò)誤。
實(shí)現(xiàn)細(xì)節(jié)解釋:
和之前一個(gè)程序相比,這個(gè)程序在線程執(zhí)行函數(shù)中使用的是count++語句使全局靜態(tài)變量count加1加20000次,這樣因?yàn)槭褂玫牟皇窃硬僮髡Z句,分成從緩存取到寄存器中,寄存器加一,再存入緩存三步進(jìn)行,所以各個(gè)線程會(huì)很容易發(fā)生條件沖突,最后產(chǎn)生的是一個(gè)錯(cuò)的結(jié)果。
- 驗(yàn)證實(shí)驗(yàn)alg.18-2-syn-compare-test.c
執(zhí)行程序命令:
gcc alg.18-2-syn-compare-test.c ./a.out分析:
實(shí)現(xiàn)細(xì)節(jié)解釋:
第一個(gè)代碼片段中,value值為200000,oldval值為123456,newval值為654321,執(zhí)行語句__sync_bool_compare_and_swap(&value, oldval, newval),比較value與oldval的值,因?yàn)椴幌嗟?#xff0c;所以value保持原值,并返回false給ret,所以最后打印結(jié)果,ret為0,value為200000,oldval為123456,newval為654321。
第二個(gè)代碼片段中,value值為200000,oldval值為200000,newval值為654321,執(zhí)行語句__sync_bool_compare_and_swap(&value, oldval, newval),比較value與oldval的值,因?yàn)橄嗟?#xff0c;所以newval更新到value,并返回true給ret,所以最后打印結(jié)果,ret為1,value為654321,oldval為123456,newval為654321。
第三個(gè)代碼片段中,value值為200000,oldval值為123456,newval值為654321,執(zhí)行語句__sync_val_compare_and_swap(&value, oldval, newval),比較value與oldval的值,因?yàn)椴幌嗟?#xff0c;所以value保持原值,并返回操作之前value的值給ret,所以最后打印結(jié)果,ret為200000,value為200000,oldval為123456,newval為654321。
第四個(gè)代碼片段中,value值為200000,oldval值為200000,newval值為654321,執(zhí)行語句__sync_val_compare_and_swap(&value, oldval, newval),比較value與oldval的值,因?yàn)橄嗟?#xff0c;所以newval的值更新到value,并返回操作之前value的值給ret,所以最后打印結(jié)果,ret為200000,value為654321,oldval為200000,newval為654321。
第五個(gè)代碼片段中,value值為200000,newval值為654321,執(zhí)行語句__sync_lock_test_and_set(&value, newval),將newval寫入value,對value加鎖,并返回操作之前value的值,所以最后打印結(jié)果,ret為200000,value為654321,newval為654321。
第六個(gè)代碼片段中,value值為200000,執(zhí)行語句__sync_lock_release(&value),將0寫入到value,并對&value解鎖,所以最后打印結(jié)果,value為0。
POSIX互斥鎖
-
互斥鎖用于保護(hù)代碼的臨界區(qū),即線程在進(jìn)入臨界區(qū)之前獲取鎖,并在退出臨界區(qū)時(shí)釋放鎖。
-
Pthreads互斥鎖采用數(shù)據(jù)類型pthread_mutex_t。一個(gè)互斥鎖可以使用pthread_mutex_init()函數(shù)創(chuàng)建。
#include <pthread.h>pthread_mutex_t mutex;/* 創(chuàng)建并初始化這個(gè)互斥鎖 */ pthread_mutex_init(&mutex, NULL);- 第一個(gè)參數(shù)是指向互斥鎖的指針。第二個(gè)參數(shù)是NULL,表示將互斥鎖按照其默認(rèn)屬性初始化。
-
互斥鎖是通過pthread_mutex_lock()和pthread_mutex_unlock()函數(shù)來獲取和釋放的。如果調(diào)用pthread_mutex_lock()時(shí)互斥鎖不可用,則調(diào)用線程將被阻塞在等待隊(duì)列中,直到互斥鎖的所有者調(diào)用pthread_mutex_unlock()釋放互斥鎖為止。
-
以下代碼說明了如何使用互斥鎖保護(hù)臨界區(qū):
所有互斥函數(shù)當(dāng)操作正確時(shí)返回值為0,如果發(fā)生錯(cuò)誤,這些函數(shù)將返回非零錯(cuò)誤代碼。
- 驗(yàn)證實(shí)驗(yàn)alg.18-3-syn-pthread-mutex.c
執(zhí)行程序命令:
gcc alg.18-3-syn-pthread-mutex.c -pthread ./a.out ./a.out syn分析:
可以看到,當(dāng)編譯命令中沒有參數(shù)時(shí),得到的加法結(jié)果是一個(gè)錯(cuò)誤的結(jié)果;當(dāng)編譯命令中有參數(shù)syn時(shí),得到的加法結(jié)果是正確的結(jié)果800000。
實(shí)現(xiàn)細(xì)節(jié)解釋:
首先在全局中,使用pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER將pthread_mutex_t類型變量mutex使用宏定義PTHREAD_MUTEX_INITIALIZER進(jìn)行靜態(tài)初始化,或者在主函數(shù)中使用語句pthread_mutex_init (&mutex, NULL)進(jìn)行初始化。主函數(shù)最后會(huì)等待創(chuàng)建的線程都執(zhí)行完后再繼續(xù)進(jìn)行,然后使用pthread_mutex_destroy(&mutex)語句釋放互斥鎖,最后打印count結(jié)果。
當(dāng)程序的編譯命令參數(shù)是syn時(shí),程序創(chuàng)建MAX_N即40個(gè)線程,每個(gè)線程的執(zhí)行函數(shù)都為:
void *test_func_syn(void *arg) {for (int i = 0; i < 20000; ++i) {pthread_mutex_lock(&mutex);count++;pthread_mutex_unlock(&mutex);}pthread_exit(NULL); }在線程執(zhí)行函數(shù)中,有一個(gè)執(zhí)行20000次的for循環(huán),里面每次使count自增前得到一個(gè)互斥鎖,然后再令count自增,最后再釋放互斥鎖,這樣保證了線程之間不會(huì)出現(xiàn)競爭條件沖突,count的自增操作有序進(jìn)行,最后得到的也是正確結(jié)果800000。
當(dāng)程序的編譯命令沒有參數(shù)或參數(shù)不是syn時(shí),程序創(chuàng)建MAX_N即40個(gè)線程,每個(gè)線程的執(zhí)行函數(shù)都為:
void *test_func_asy(void *arg) {for (int i = 0; i < 20000; ++i) {count++;}pthread_exit(NULL); }在線程執(zhí)行函數(shù)中,有一個(gè)執(zhí)行20000次的for循環(huán),里面沒有使用互斥鎖而是直接讓count進(jìn)行自增,這樣容易發(fā)生條件沖突,最后得到的結(jié)果也并不正確613245。
POSIX信號(hào)量
- POSIX SEM 擴(kuò)展指定了兩種類型的信號(hào)量:命名信號(hào)量和無名信號(hào)量。從內(nèi)核的版本2.6開始,Linux系統(tǒng)提供對這兩種類型的支持。
- POSIX命名信號(hào)量
-
函數(shù)sem_open()用于創(chuàng)建新的或打開已經(jīng)存在的信號(hào)量:
#include <fcntl.h> #include <sys/stat.h> #include <semaphore.h> sem_t *sem_open(const char *name, int oflag); sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value); -
例如:
sem_t *sem; sem = sem_open("MYSEM", O_CREAT, 0666, 1);- 命名信號(hào)量MYSEM被創(chuàng)建并初始化為1。它對其他進(jìn)程具有讀寫訪問權(quán)限。
-
多個(gè)不相關(guān)的進(jìn)程可以簡單地通過引用信號(hào)量的名稱,使用一個(gè)通用的命名信號(hào)量作為同步機(jī)制。
-
在上面的示例中,一旦創(chuàng)建了信號(hào)量MYSEM,其他進(jìn)程隨后使用相同參數(shù)調(diào)用sem_open()時(shí),會(huì)將描述符sem返回給現(xiàn)有的信號(hào)量。POSIX分別聲明這些操作為sem_wait(sem)和sem_post(sem)。
-
下面說明如何使用上面創(chuàng)建的命名信號(hào)量保護(hù)臨界區(qū):
sem_wait(sem); /* 獲取信號(hào)量 */ 臨界區(qū) sem_post(sem); /* 釋放信號(hào)量 */ ... sem_close(sem);
-
- POSIX無名信號(hào)量
- 無名信號(hào)量是通過sem_init()函數(shù)進(jìn)行創(chuàng)建和初始化的,該函數(shù)傳遞了三個(gè)參數(shù):
(1)信號(hào)量的指針
(2)表示共享級(jí)別的標(biāo)志
(3)信號(hào)量的初始值int sem_init(sem_t *sem, int pshared, unsigned int value) - 例如:#include <semaphore.h> sem_t sem; sem_init(&sem, 0, 1); /* 創(chuàng)建信號(hào)量并將其初始化為1 */
- pshared = 0表示此信號(hào)量只能由屬于創(chuàng)建該信號(hào)量的同一進(jìn)程的線程共享。
- 信號(hào)量設(shè)置為值1。
- POSIX無名信號(hào)量對描述符sem也使用了與命名信號(hào)量相同的sem_wait(sem)和sem_post(sem)操作。
- 下面說明如何使用上面創(chuàng)建的無名信號(hào)量保護(hù)臨界區(qū):sem_wait(&sem); /* 獲取信號(hào)量 */ 臨界區(qū) sem_post(&sem); /* 釋放信號(hào)量 */ ... sem_destroy(&sem);
- 無名信號(hào)量是通過sem_init()函數(shù)進(jìn)行創(chuàng)建和初始化的,該函數(shù)傳遞了三個(gè)參數(shù):
通常在進(jìn)程間同步中使用命名信號(hào)量,而無名信號(hào)量用于線程間通信。
- 驗(yàn)證實(shí)驗(yàn)alg.18-4-syn-pthread-sem-unnamed.c
執(zhí)行程序命令:
gcc alg.18-4-syn-pthread-sem-unnamed.c -pthread ./a.out syn ./a.out分析:
可以看到,當(dāng)編譯命令中有參數(shù)syn時(shí),得到的加法結(jié)果是正確的結(jié)果800000;當(dāng)編譯命令中沒有參數(shù)時(shí),得到的加法結(jié)果是一個(gè)錯(cuò)誤的結(jié)果。
實(shí)現(xiàn)細(xì)節(jié)解釋:
首先在全局中,聲明一個(gè)信號(hào)量標(biāo)識(shí)符類型sem_t變量unnamed_sem,然后在主函數(shù)中使用語句sem_init(&unnamed_sem, 0, 1)創(chuàng)建無名信號(hào)量unnamed_sem并初始化為1。主函數(shù)最后會(huì)等待創(chuàng)建的線程都執(zhí)行完后再繼續(xù)進(jìn)行,然后打印count結(jié)果,最后使用sem_destroy(&unnamed_sem)語句銷毀信號(hào)量。
當(dāng)程序的編譯命令參數(shù)是syn時(shí),程序創(chuàng)建MAX_N即40個(gè)線程,每個(gè)線程的執(zhí)行函數(shù)都為:
void *test_func_syn(void *arg) {for (int i = 0; i < 20000; ++i) {sem_wait(&unnamed_sem);count++;sem_post(&unnamed_sem);}pthread_exit(NULL); }在線程執(zhí)行函數(shù)中,有一個(gè)執(zhí)行20000次的for循環(huán),里面每次使count自增前得到一個(gè)信號(hào)量,然后再令count自增,最后再釋放信號(hào)量,這樣保證了線程之間不會(huì)出現(xiàn)競爭條件沖突,count的自增操作有序進(jìn)行,最后得到的也是正確結(jié)果800000。
當(dāng)程序的編譯命令沒有參數(shù)或參數(shù)不是syn時(shí),程序創(chuàng)建MAX_N即40個(gè)線程,每個(gè)線程的執(zhí)行函數(shù)都為:
void *test_func_asy(void *arg) {for (int i = 0; i < 20000; ++i) {count++;}pthread_exit(NULL); }在線程執(zhí)行函數(shù)中,有一個(gè)執(zhí)行20000次的for循環(huán),里面沒有使用信號(hào)量而是直接讓count進(jìn)行自增,這樣容易發(fā)生條件沖突,最后得到的結(jié)果也并不正確632537。
- 驗(yàn)證實(shí)驗(yàn)alg.18-5-syn-pthread-sem-named.c
執(zhí)行程序命令:
gcc alg.18-5-syn-pthread-sem-named.c -pthread ./a.out syn ./a.out分析:
可以看到,當(dāng)編譯命令中有參數(shù)syn時(shí),得到的加法結(jié)果是正確的結(jié)果800000;當(dāng)編譯命令中沒有參數(shù)時(shí),得到的加法結(jié)果是一個(gè)錯(cuò)誤的結(jié)果。
實(shí)現(xiàn)細(xì)節(jié)解釋:
首先在全局中,聲明一個(gè)信號(hào)量標(biāo)識(shí)符類型sem_t *指針變量named_sem,然后在主函數(shù)中使用語句named_sem = sem_open("MYSEM", O_CREAT, 0666, 1)創(chuàng)建命名信號(hào)量MYSEM并初始化為1,并返回信號(hào)量標(biāo)識(shí)符給變量named_sem,這時(shí)一個(gè)名為sem.MYSEM的文件將會(huì)在/dev/shm/目錄下被創(chuàng)建,任何知道這個(gè)文件名的進(jìn)程和線程都可以共享這個(gè)信號(hào)量。
主函數(shù)最后會(huì)等待創(chuàng)建的線程都執(zhí)行完后再繼續(xù)進(jìn)行,然后打印count結(jié)果,接著使用sem_close(named_sem)語句關(guān)閉命名信號(hào)量,最后使用語句sem_unlink("MYSEM")從/dev/shm/目錄下移除sem.MYSEM文件當(dāng)其標(biāo)識(shí)符為0時(shí)。
當(dāng)程序的編譯命令參數(shù)是syn時(shí),程序創(chuàng)建MAX_N即40個(gè)線程,每個(gè)線程的執(zhí)行函數(shù)都為:
void *test_func_syn(void *arg) {for (int i = 0; i < 20000; ++i) {sem_wait(&unnamed_sem);count++;sem_post(&unnamed_sem);}pthread_exit(NULL); }在線程執(zhí)行函數(shù)中,有一個(gè)執(zhí)行20000次的for循環(huán),里面每次使count自增前得到一個(gè)信號(hào)量,然后再令count自增,最后再釋放信號(hào)量,這樣保證了線程之間不會(huì)出現(xiàn)競爭條件沖突,count的自增操作有序進(jìn)行,最后得到的也是正確結(jié)果800000。
當(dāng)程序的編譯命令沒有參數(shù)或參數(shù)不是syn時(shí),程序創(chuàng)建MAX_N即40個(gè)線程,每個(gè)線程的執(zhí)行函數(shù)都為:
void *test_func_asy(void *arg) {for (int i = 0; i < 20000; ++i) {count++;}pthread_exit(NULL); }在線程執(zhí)行函數(shù)中,有一個(gè)執(zhí)行20000次的for循環(huán),里面沒有使用信號(hào)量而是直接讓count進(jìn)行自增,這樣容易發(fā)生條件沖突,最后得到的結(jié)果也并不正確704064。
- 驗(yàn)證實(shí)驗(yàn)多生產(chǎn)者-多消費(fèi)者問題
執(zhí)行程序命令:
gcc alg.18-6-syn-pc-con-6.c -pthread gcc alg.18-7-syn-pc-producer-6.c -o alg.18-7-syn-pc-producer-6.o -pthread gcc alg.18-8-syn-pc-consumer-6.c -o alg.18-8-syn-pc-consumer-6.o -pthread ./a.out myshm 4 8 2 3分析:
緩沖區(qū)大小為4,生產(chǎn)項(xiàng)目數(shù)量為8,生產(chǎn)者數(shù)目為2,消費(fèi)者數(shù)量為3時(shí),生產(chǎn)和消費(fèi)的過程有序進(jìn)行,直到8個(gè)項(xiàng)目被從循環(huán)隊(duì)列中全部取出消費(fèi),程序結(jié)束。
實(shí)現(xiàn)細(xì)節(jié)解釋:
在頭文件alg.18-6-syn-pc-con-6.h中定義了必要的數(shù)據(jù)和結(jié)構(gòu):
#define BASE_ADDR 10 /* 共享內(nèi)存的前十個(gè)單位保留給控制結(jié)構(gòu)體ctln_pc_st,數(shù)據(jù)從下標(biāo)為10的單位開始循環(huán)數(shù)據(jù)隊(duì)列由(enqueue | dequeue) % buffer_size + BASE_ADDR表示 */struct ctln_pc_st {int BUFFER_SIZE; // 緩沖區(qū)大小,共享內(nèi)存中數(shù)據(jù)單元的數(shù)目int MAX_ITEM_NUM; // 要生產(chǎn)的項(xiàng)目數(shù)目int THREAD_PRO; // 生產(chǎn)者數(shù)目int THREAD_CONS; // 消費(fèi)者數(shù)目sem_t sem_mutex; // 表示互斥信號(hào)量sem_t stock; // 表示緩沖區(qū)中存儲(chǔ)數(shù)量的信號(hào)量sem_t emptyslot; // 表示緩沖區(qū)中空閑單元數(shù)目的信號(hào)量int item_num; // 已經(jīng)生產(chǎn)了的項(xiàng)目的總數(shù)目int consume_num; // 已經(jīng)消費(fèi)了的項(xiàng)目的總數(shù)目int enqueue; // 當(dāng)前生產(chǎn)者在循環(huán)隊(duì)列中的位置int dequeue; // 當(dāng)前消費(fèi)者在循環(huán)隊(duì)列中的位置int END_FLAG; // 生產(chǎn)者生產(chǎn)完所有項(xiàng)目完成工作后,置為1,否則置為0,表示生產(chǎn)者還未完成完工作 }; /* 60 bytes */struct data_pc_st {int item_no; // 生產(chǎn)項(xiàng)目時(shí)的項(xiàng)目序號(hào)int pro_no; // 生產(chǎn)者序號(hào)long int pro_tid; // 生產(chǎn)該項(xiàng)目的生產(chǎn)者的線程號(hào) }; /* 16 bytes */首先,進(jìn)程syn-pc-con會(huì)先創(chuàng)建一個(gè)共享內(nèi)存區(qū),然后使用execv()函數(shù)引發(fā)兩個(gè)子進(jìn)程,分別為syn-pc-producer生產(chǎn)者進(jìn)程和syn-pc-consumer消費(fèi)者進(jìn)程,兩個(gè)子進(jìn)程異步執(zhí)行,并將共享內(nèi)存標(biāo)識(shí)符作為參數(shù)傳遞給子進(jìn)程,父進(jìn)程等待子進(jìn)程執(zhí)行完后再接著執(zhí)行,最后結(jié)束。
alg.18-6-syn-pc-con-6.c:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/stat.h> #include <sys/shm.h> #include <semaphore.h> #include <wait.h> #include "alg.18-6-syn-pc-con-6.h"int shmid; void *shm = NULL; int detachshm(void);int main(int argc, char *argv[]) {pid_t childpid, pro_pid, cons_pid;struct stat statbuf;int buffer_size, max_item_num, thread_pro, thread_cons;// 需要在編譯命令中提供共享對象的文件名或路徑if (argc < 2) {printf("\nshared file object undeclared!\nUsage: syn-pc-con-6.o /home/myshm\n");return EXIT_FAILURE;}// 共享對象的文件應(yīng)該要存在if (stat(argv[1], &statbuf) == -1) {perror("stat()");return EXIT_FAILURE;}while (1) {// 輸入緩沖區(qū)大小printf("Pls input the buffer size(1-100, 0 quit): ");scanf("%d", &buffer_size);if (buffer_size <= 0) return 0;if (buffer_size > 100) continue;// 輸入要生產(chǎn)的項(xiàng)目的最大個(gè)數(shù)printf("Pls input the max number of items to be produced(1-10000, 0 quit): ");scanf("%d", &max_item_num);if (max_item_num <= 0) return 0;if (max_item_num > 10000) continue;// 輸入生產(chǎn)者的個(gè)數(shù)printf("Pls input the number of producers(1-500, 0 quit): ");scanf("%d", &thread_pro);if (thread_pro <= 0) return 0;if (thread_pro < 0) continue;// 輸入消費(fèi)者的個(gè)數(shù)printf("Pls input the number of consumers(1-500, 0 quit): ");scanf("%d", &thread_cons);if (thread_cons <= 0) return 0;if (thread_cons < 0) continue;break;}struct ctln_pc_st *ctln = NULL;struct data_pc_st *data = NULL;key_t key;int ret;// 獲取IPC鍵值if ((key = ftok(argv[1], 0x28)) < 0) { perror("ftok()");exit(EXIT_FAILURE);}// 獲取共享內(nèi)存標(biāo)識(shí)符shmid = shmget((key_t)key, (buffer_size + BASE_ADDR)*sizeof(struct data_pc_st), 0666 | IPC_CREAT);if (shmid == -1) {perror("shmget()");exit(EXIT_FAILURE);}// 把共享內(nèi)存區(qū)對象映射到調(diào)用進(jìn)程的地址空間,允許本進(jìn)程訪問共享內(nèi)存shm = shmat(shmid, 0, 0);if (shm == (void *)-1) {perror("shmat()");exit(EXIT_FAILURE);}// 設(shè)置共享內(nèi)存,分別設(shè)置控制結(jié)構(gòu)體ctln和數(shù)據(jù)結(jié)構(gòu)體datactln = (struct ctln_pc_st *)shm;data = (struct data_pc_st *)shm;// 初始化所有的控制參數(shù),共享內(nèi)存的前十個(gè)單位保留給控制參數(shù),數(shù)據(jù)從下標(biāo)為10的單位開始ctln->BUFFER_SIZE = buffer_size;ctln->MAX_ITEM_NUM = max_item_num;ctln->THREAD_PRO = thread_pro;ctln->THREAD_CONS = thread_cons; ctln->item_num = 0;ctln->consume_num = 0;// 循環(huán)數(shù)據(jù)隊(duì)列由(enqueue | dequeue) % buffer_size + BASE_ADDR表示ctln->enqueue = 0;ctln->dequeue = 0;ctln->END_FLAG = 0;// 初始化互斥信號(hào)量為1,對于進(jìn)程間共享,sem_init()的第二個(gè)參數(shù)必須設(shè)置為非零ret = sem_init(&ctln->sem_mutex, 1, 1);if (ret == -1) {perror("sem_init-mutex");return detachshm();}// 將表示緩沖區(qū)存儲(chǔ)數(shù)量的信號(hào)量ctln->stock初始化為0ret = sem_init(&ctln->stock, 1, 0);if (ret == -1) {perror("sem_init-stock");return detachshm();}// 將表示緩沖區(qū)中空閑單元數(shù)目的信號(hào)量ctln->emptyslot初始化為BUFFER_SIZEret = sem_init(&ctln->emptyslot, 1, ctln->BUFFER_SIZE);if (ret == -1) {perror("sem_init-emptyslot");return detachshm();}// 打印進(jìn)程進(jìn)程號(hào)printf("\nsyn-pc-con console pid = %d\n", getpid());// 將共享內(nèi)存標(biāo)識(shí)符作為參數(shù)傳遞給生產(chǎn)者進(jìn)程和消費(fèi)者進(jìn)程char *argv1[3];char execname[] = "./";char shmidstring[10];sprintf(shmidstring, "%d", shmid);argv1[0] = execname;argv1[1] = shmidstring;argv1[2] = NULL;childpid = vfork();if (childpid < 0) {perror("first fork");return detachshm();} // 調(diào)用生產(chǎn)者進(jìn)程else if (childpid == 0) {pro_pid = getpid();printf("producer pid = %d, shmid = %s\n", pro_pid, argv1[1]);execv("./alg.18-7-syn-pc-producer-6.o", argv1);}else {childpid = vfork();if (childpid < 0) {perror("second fork");return detachshm();} // 調(diào)用消費(fèi)者進(jìn)程else if (childpid == 0) {cons_pid = getpid();printf("consumer pid = %d, shmid = %s\n", cons_pid, argv1[1]);execv("./alg.18-8-syn-pc-consumer-6.o", argv1);}}// 等待生產(chǎn)者進(jìn)程和消費(fèi)者進(jìn)程結(jié)束后父進(jìn)程再執(zhí)行if (waitpid(pro_pid, 0, 0) != pro_pid)perror("wait pro");elseprintf("waiting pro_pid %d success.\n", pro_pid);if (waitpid(cons_pid, 0, 0) != cons_pid)perror("wait cons");elseprintf("waiting cons_pid %d success.\n", cons_pid);// 銷毀互斥信號(hào)量ctln->sem_mutexret = sem_destroy(&ctln->sem_mutex);if (ret == -1)perror("sem_destroy sem_mutex");// 銷毀表示緩沖區(qū)存儲(chǔ)數(shù)量的信號(hào)量ctln->sem_stockret = sem_destroy(&ctln->stock);if (ret == -1)perror("sem_destroy stock");// 銷毀表示緩沖區(qū)中空閑單元數(shù)目的信號(hào)量ctln->emptyslotret = sem_destroy(&ctln->emptyslot);if (ret == -1)perror("sem_destroy empty_slot");return detachshm(); }// 斷開進(jìn)程與共享內(nèi)存附加點(diǎn)的地址,釋放共享內(nèi)存區(qū) int detachshm(void) {if (shmdt(shm) == -1) {perror("shmdt()");exit(EXIT_FAILURE);}if (shmctl(shmid, IPC_RMID, 0) == -1) {perror("shmctl(IPC_RMID)");exit(EXIT_FAILURE);} }生產(chǎn)者進(jìn)程syn-pc-producer會(huì)創(chuàng)建THREAD_PRO個(gè)生產(chǎn)者線程,異步進(jìn)行生產(chǎn)。只有當(dāng)已經(jīng)生產(chǎn)的產(chǎn)品數(shù)量小于要生產(chǎn)的產(chǎn)品數(shù)量時(shí),才會(huì)執(zhí)行循環(huán)生產(chǎn)代碼,生產(chǎn)的產(chǎn)品插入到循環(huán)隊(duì)列中,當(dāng)已經(jīng)生產(chǎn)的產(chǎn)品數(shù)量等于要生產(chǎn)的產(chǎn)品數(shù)量時(shí),完成工作,生產(chǎn)者的進(jìn)程結(jié)束。
alg.18-7-syn-pc-producer-6.c:
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <sys/shm.h> #include <semaphore.h> #include <unistd.h> #include <sys/syscall.h> #include "alg.18-6-syn-pc-con-6.h"#define gettid() syscall(__NR_gettid)void *producer(void *arg) {// 獲取共享內(nèi)存結(jié)構(gòu)體,分別為控制結(jié)構(gòu)體和數(shù)據(jù)結(jié)構(gòu)體struct ctln_pc_st *ctln = (struct ctln_pc_st *)arg;struct data_pc_st *data = (struct data_pc_st *)arg;// 當(dāng)生產(chǎn)者已經(jīng)制造的項(xiàng)目數(shù)量小于要生產(chǎn)的項(xiàng)目時(shí)while (ctln->item_num < ctln->MAX_ITEM_NUM) {// 等待緩沖區(qū)空閑單元數(shù)目信號(hào)量大于0,表示有空閑單元可以生產(chǎn)后存放項(xiàng)目,然后將空閑單元數(shù)目信號(hào)量減一,繼續(xù)執(zhí)行sem_wait(&ctln->emptyslot);// 等待互斥信號(hào)量大于0,防止臨界沖突,然后將互斥鎖信號(hào)量減一,繼續(xù)執(zhí)行sem_wait(&ctln->sem_mutex);// 當(dāng)生產(chǎn)者已經(jīng)制造的項(xiàng)目數(shù)量小于要生產(chǎn)的項(xiàng)目時(shí)if (ctln->item_num < ctln->MAX_ITEM_NUM) {// 生產(chǎn)者已經(jīng)制造的項(xiàng)目數(shù)量加一,并將制造的項(xiàng)目設(shè)置好項(xiàng)目序列號(hào)和制造該項(xiàng)目的線程號(hào)后,放入循環(huán)隊(duì)列ctln->item_num++; ctln->enqueue = (ctln->enqueue + 1) % ctln->BUFFER_SIZE;(data + ctln->enqueue + BASE_ADDR)->item_no = ctln->item_num;(data + ctln->enqueue + BASE_ADDR)->pro_tid = gettid();printf("producer tid %ld prepared item no %d, now enqueue = %d\n", (data + ctln->enqueue + BASE_ADDR)->pro_tid, (data + ctln->enqueue + BASE_ADDR)->item_no, ctln->enqueue);// 當(dāng)生產(chǎn)者已經(jīng)制造的項(xiàng)目數(shù)量等于要生產(chǎn)的項(xiàng)目時(shí),說明完成工作,設(shè)置ctln->END_FLAG為1if (ctln->item_num == ctln->MAX_ITEM_NUM)ctln->END_FLAG = 1;// 將表示緩沖區(qū)中存儲(chǔ)數(shù)量的信號(hào)量加一,繼續(xù)執(zhí)行sem_post(&ctln->stock);} // 當(dāng)生產(chǎn)者已經(jīng)制造的項(xiàng)目數(shù)量不小于要生產(chǎn)的項(xiàng)目時(shí),將表示緩沖區(qū)空閑單元數(shù)目的信號(hào)量加一else {sem_post(&ctln->emptyslot);}// 然后將互斥鎖信號(hào)量加一,允許其它線程執(zhí)行sem_post(&ctln->sem_mutex);sleep(1);}pthread_exit(0); }int main(int argc, char *argv[]) {struct ctln_pc_st *ctln = NULL;struct data_pc_st *data = NULL;int shmid;void *shm = NULL;// 獲取共享內(nèi)存標(biāo)識(shí)符shmid = strtol(argv[1], NULL, 10);// 把共享內(nèi)存區(qū)對象映射到調(diào)用進(jìn)程的地址空間,允許本進(jìn)程訪問共享內(nèi)存shm = shmat(shmid, 0, 0);if (shm == (void *)-1) {perror("\nproducer shmat()");exit(EXIT_FAILURE);}// 獲取共享內(nèi)存結(jié)構(gòu)體,分別為控制結(jié)構(gòu)體ctln和數(shù)據(jù)結(jié)構(gòu)體datactln = (struct ctln_pc_st *)shm;data = (struct data_pc_st *)shm;pthread_t ptid[ctln->THREAD_PRO];int i, ret;// 創(chuàng)建ctln->THREAD_PRO個(gè)生產(chǎn)者線程for (i = 0; i < ctln->THREAD_PRO; ++i) {// 線程執(zhí)行函數(shù)為producerret = pthread_create(&ptid[i], NULL, &producer, shm);if (ret != 0) {perror("producer pthread_create()");break;}} // 主線程等待子線程都執(zhí)行完后再繼續(xù)執(zhí)行for (i = 0; i < ctln->THREAD_PRO; ++i) {pthread_join(ptid[i], NULL);}// 所有生產(chǎn)者都停止工作,以防止有些消費(fèi)者會(huì)拿走最后的項(xiàng)目,不超過THREAD_CON-1個(gè)消費(fèi)者會(huì)停留在sem_wait(&stock)的等待隊(duì)列中for (i = 0; i < ctln->THREAD_CONS - 1; ++i)sem_post(&ctln->stock);// 斷開進(jìn)程與共享內(nèi)存附加點(diǎn)的地址if (shmdt(shm) == -1) {perror("producer shmdt()");exit(EXIT_FAILURE);}return 0; }消費(fèi)者進(jìn)程syn-pc-consumer會(huì)創(chuàng)建THREAD_CONS個(gè)消費(fèi)者線程,異步進(jìn)行消費(fèi)。只有當(dāng)消費(fèi)者已經(jīng)消費(fèi)的項(xiàng)目數(shù)量小于生產(chǎn)者已經(jīng)生產(chǎn)的項(xiàng)目數(shù)量,或生產(chǎn)者還沒完成工作時(shí),才會(huì)執(zhí)行循環(huán)消費(fèi)代碼,消費(fèi)的產(chǎn)品從循環(huán)隊(duì)列中取出。
alg.18-8-syn-pc-consumer-6.c:
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <sys/shm.h> #include <semaphore.h> #include <unistd.h> #include <sys/syscall.h> #include "alg.18-6-syn-pc-con-6.h"#define gettid() syscall(__NR_gettid)void *consumer(void *arg) {// 獲取共享內(nèi)存結(jié)構(gòu)體,分別為控制結(jié)構(gòu)體和數(shù)據(jù)結(jié)構(gòu)體struct ctln_pc_st *ctln = (struct ctln_pc_st *)arg;struct data_pc_st *data = (struct data_pc_st *)arg;// 當(dāng)消費(fèi)者已經(jīng)消費(fèi)的項(xiàng)目數(shù)量小于生產(chǎn)者已經(jīng)生產(chǎn)的項(xiàng)目數(shù)量,或生產(chǎn)者還沒完成工作時(shí)while ((ctln->consume_num < ctln->item_num) || (ctln->END_FLAG == 0)) { // 等待表示緩沖區(qū)中存儲(chǔ)數(shù)量的信號(hào)量大于0,表示緩沖區(qū)中有項(xiàng)目可以消費(fèi),然后將存儲(chǔ)數(shù)量信號(hào)量減一,繼續(xù)執(zhí)行。如果存儲(chǔ)數(shù)量是空的,且所有的生產(chǎn)者都停止工作,那么一個(gè)或多個(gè)消費(fèi)者可能會(huì)永遠(yuǎn)等待sem_wait(&ctln->stock);// 等待互斥信號(hào)量大于0,防止臨界沖突,然后將互斥鎖信號(hào)量減一,繼續(xù)執(zhí)行sem_wait(&ctln->sem_mutex);// 當(dāng)消費(fèi)者已經(jīng)消費(fèi)的項(xiàng)目數(shù)量小于生產(chǎn)者已經(jīng)生產(chǎn)的項(xiàng)目數(shù)量if (ctln->consume_num < ctln->item_num) { // 從循環(huán)隊(duì)列中取出項(xiàng)目消費(fèi),打印取出項(xiàng)目的相關(guān)信息ctln->dequeue = (ctln->dequeue + 1) % ctln->BUFFER_SIZE;printf("\t\t\t\tconsumer tid %ld taken item no %d by pro %ld, now dequeue = %d\n", gettid(), (data + ctln->dequeue + BASE_ADDR)->item_no, (data + ctln->dequeue + BASE_ADDR)->pro_tid, ctln->dequeue);ctln->consume_num++;// 將表示緩沖區(qū)空閑單元數(shù)目的信號(hào)量加一,繼續(xù)執(zhí)行sem_post(&ctln->emptyslot);}// 當(dāng)消費(fèi)者已經(jīng)消費(fèi)的項(xiàng)目數(shù)量不小于生產(chǎn)者已經(jīng)生產(chǎn)的項(xiàng)目數(shù)量,將表示緩沖區(qū)中存儲(chǔ)數(shù)量的信號(hào)量加一else {sem_post(&ctln->stock);}// 然后將互斥鎖信號(hào)量加一,允許其它線程執(zhí)行sem_post(&ctln->sem_mutex);}pthread_exit(0); }int main(int argc, char *argv[]) {struct ctln_pc_st *ctln = NULL;struct data_pc_st *data = NULL;int shmid;void *shm = NULL;// 獲取共享內(nèi)存標(biāo)識(shí)符shmid = strtol(argv[1], NULL, 10);// 把共享內(nèi)存區(qū)對象映射到調(diào)用進(jìn)程的地址空間,允許本進(jìn)程訪問共享內(nèi)存shm = shmat(shmid, 0, 0);if (shm == (void *)-1) {perror("consumer shmat()");exit(EXIT_FAILURE);}// 獲取共享內(nèi)存結(jié)構(gòu)體,分別為控制結(jié)構(gòu)體ctln和數(shù)據(jù)結(jié)構(gòu)體datactln = (struct ctln_pc_st *)shm;data = (struct data_pc_st *)shm;pthread_t ptid[ctln->THREAD_CONS];int i, ret;// 創(chuàng)建ctln->THREAD_CONS個(gè)消費(fèi)者線程for (i = 0; i < ctln->THREAD_CONS; ++i) {// 線程執(zhí)行函數(shù)為consumerret = pthread_create(&ptid[i], NULL, &consumer, shm); if (ret != 0) {perror("consumer pthread_create()");break;}} // 主線程等待子線程都執(zhí)行完后再繼續(xù)執(zhí)行for (i = 0; i < ctln->THREAD_CONS; ++i)pthread_join(ptid[i], NULL);// 斷開進(jìn)程與共享內(nèi)存附加點(diǎn)的地址if (shmdt(shm) == -1) {perror("consumer shmdt()");exit(EXIT_FAILURE);} return 0; }POSIX條件變量
-
Pthreads中的條件變量的行為類似于監(jiān)視器上下文中使用的條件變量,后者提供了一種鎖定機(jī)制來確保數(shù)據(jù)完整性。
-
Pthreads通常用于C程序中。由于C語言沒有監(jiān)視器,互斥鎖與條件變量相關(guān)聯(lián)以完成鎖定。
-
Pthreads中的條件變量使用pthread_cond_t數(shù)據(jù)類型,并由pthread_cond_init()初始化。以下代碼創(chuàng)建并初始化條件變量及其關(guān)聯(lián)的互斥鎖:
pthread_mutex_t mutex; pthread_cond_t cond_var;pthread_mutex_init(&mutex, NULL); pthread_cond_init(&cond_var, NULL); -
例子:
- 線程可以使用Pthread條件變量等待條件子句(a == b)變?yōu)閠rue:pthread_mutex_lock(&mutex); while (a != b)pthread_cond_wait(&cond_var, &mutex); 臨界區(qū) pthread_mutex_unlock(&mutex);
-
在調(diào)用pthread_cond_wait()函數(shù)之前,必須鎖定與cond_var關(guān)聯(lián)的互斥鎖,因?yàn)樗糜诒Wo(hù)條件子句中的數(shù)據(jù)不受可能的競爭條件的影響。
-
pthread_cond_wait()函數(shù)用于等待條件變量。
-
一旦獲得了這個(gè)鎖,線程就會(huì)檢查條件并調(diào)用pthread_cond_wait(),當(dāng)(a != b)時(shí),將互斥鎖和cond_var作為參數(shù)傳遞,條件不正確。
-
pthread_cond_wait()將調(diào)用線程放在條件等待隊(duì)列的末尾,釋放互斥鎖以允許另一個(gè)線程訪問共享數(shù)據(jù),并可能更新其值,以便條件子句(a == b)的判斷結(jié)果為true。當(dāng)調(diào)用線程被激活時(shí),它將鎖定互斥鎖并再次檢查條件。
- 這一點(diǎn)很重要,因?yàn)楫?dāng)條件子句為true時(shí),條件等待隊(duì)列中調(diào)用線程之前的另一個(gè)線程可能會(huì)被調(diào)度。
-
例子:
- 線程可以調(diào)用pthread_cond_signal()函數(shù),從而發(fā)出一個(gè)線程在等待條件變量的信號(hào)。pthread_mutex_lock(&mutex); if (a == b)pthread_cond_signal(&cond_var); pthread_mutex_unlock(&mutex);
-
需要注意的是:
- pthread_cond_signal()不會(huì)釋放互斥鎖。
- pthread_mutex_unlock()釋放互斥鎖。
- 一旦釋放互斥鎖,發(fā)出信號(hào)的線程就成為互斥鎖的所有者,并從pthread_cond_wait()調(diào)用返回控制。
-
驗(yàn)證實(shí)驗(yàn)alg.18-9-pthread-cond-wait.c
執(zhí)行程序命令:
gcc alg.18-9-pthread-cond-wait.c -pthread ./a.out syn分析:
可以看到,變量count的自增和自減有序進(jìn)行,沒有發(fā)生競爭條件導(dǎo)致count的值錯(cuò)亂的情況。
實(shí)現(xiàn)細(xì)節(jié)解釋:
首先在全局中,將pthread_mutex_t互斥鎖標(biāo)識(shí)符類型變量mutex使用宏定義PTHREAD_MUTEX_INITIALIZER進(jìn)行靜態(tài)初始化,將pthread_cond_t條件變量類型變量cond使用宏定義PTHREAD_COND_INITIALIZER進(jìn)行初始化,
主函數(shù)最后會(huì)等待創(chuàng)建的線程都執(zhí)行完后再繼續(xù)進(jìn)行,然后使用pthread_mutex_destroy(&mutex)語句銷毀互斥鎖,使用語句pthread_cond_destroy(&cond)銷毀條件變量,結(jié)束程序。
主函數(shù)中會(huì)創(chuàng)建兩個(gè)線程,兩個(gè)線程異步執(zhí)行,其中一個(gè)線程的執(zhí)行函數(shù)為:
void *decrement(void *arg) { for (int i = 0; i < 4; i++) {pthread_mutex_lock(&mutex); while (count <= 0) /* wait until count > 0 */pthread_cond_wait(&cond, &mutex); count--; printf("\t\t\t\tcount = %d.\n", count); printf("\t\t\t\tUnlock decrement.\n"); pthread_mutex_unlock(&mutex); }return NULL; }在線程執(zhí)行函數(shù)中,有一個(gè)執(zhí)行4次的for循環(huán),里面每次循環(huán)首先獲取一個(gè)互斥鎖,以防止多個(gè)線程同時(shí)請求pthread_cond_wait()的競爭條件,當(dāng)變量count小于等于0時(shí),pthread_cond_wait()會(huì)先解除互斥鎖,然后在等待隊(duì)列中休眠,直到變量count大于0且等待條件成立被喚醒后才繼續(xù)執(zhí)行,先鎖定互斥鎖,然后count自減,打印此時(shí)count的值并釋放互斥鎖。
另一個(gè)線程的執(zhí)行函數(shù)為:
void *increment(void *arg) {for (int i = 0; i < 4; i++) {for (int j = 0; j < 10000; j++) ; /* sleep for a while */pthread_mutex_lock(&mutex); count++; printf("count = %d.\n", count);if (count > 0) pthread_cond_signal(&cond); printf("Unlock increment.\n"); pthread_mutex_unlock(&mutex); }return NULL; }在線程執(zhí)行函數(shù)中,有一個(gè)執(zhí)行4次的for循環(huán),里面每次循環(huán)首先利用for循環(huán)等待一段時(shí)間,然后獲取一個(gè)互斥鎖,接著使count自增,如果此時(shí)count大于0時(shí),使用語句pthread_cond_signal(&cond)激活一個(gè)正在等待該條件的線程,最后釋放互斥鎖。
內(nèi)容2:在 Lab Week 13 的基礎(chǔ)上用信號(hào)量解決線程池分配的互斥問題。
設(shè)計(jì)報(bào)告
線程池設(shè)計(jì)圖
代碼設(shè)計(jì)
測試代碼:
//threadpools.c文件 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sched.h> #include <pthread.h> #include <sys/stat.h> #include <sys/types.h> #include <sys/wait.h> #include <sys/ipc.h> #include <sys/time.h> #include <sys/msg.h> #include <sys/syscall.h> #include <semaphore.h> #include <fcntl.h> #include <unistd.h>#define gettid() syscall(__NR_gettid)/* wrap the system call syscall(__NR_gettid), __NR_gettid = 224 */ #define gettidv2() syscall(SYS_gettid) /* a traditional wrapper */#define THREADS_NUM 10 // 線程池中的線程個(gè)數(shù) #define TASK_QUEUE_MAX_SIZE 12 // 任務(wù)的等待隊(duì)列的最大長度,等待隊(duì)列中的最大任務(wù)個(gè)數(shù)為長度減一 #define TASK_NUM 100 // 要執(zhí)行的任務(wù)總數(shù)// 線程池中每個(gè)線程執(zhí)行的任務(wù)的結(jié)構(gòu)體 typedef struct {void *(*function)(void *); // 執(zhí)行函數(shù)void *arg; // 參數(shù) } Task;// 任務(wù)循環(huán)隊(duì)列的數(shù)據(jù)結(jié)構(gòu) typedef struct {Task tasks[TASK_QUEUE_MAX_SIZE]; // 任務(wù)隊(duì)列數(shù)組int front; // 隊(duì)首下標(biāo)int rear; // 隊(duì)尾下標(biāo) } TaskQueue;// 線程池?cái)?shù)據(jù)結(jié)構(gòu) typedef struct {pthread_t threads[THREADS_NUM]; // 線程數(shù)組TaskQueue taskQueue; // 任務(wù)隊(duì)列int taskSum; // 剩余任務(wù)總數(shù),結(jié)束程序用sem_t sem_mutex; // 互斥信號(hào)量 } Threadpools;// 線程池中每個(gè)線程執(zhí)行的任務(wù) static void *executeTask(void *arg) {// 向每個(gè)線程傳入的參數(shù)是線程池Threadpools *pools = (Threadpools *)arg;while (1) {// 等待互斥信號(hào)量大于0,防止臨界沖突,然后將互斥鎖信號(hào)量減一,繼續(xù)執(zhí)行sem_wait(&pools->sem_mutex);// 當(dāng)任務(wù)隊(duì)列為空時(shí)while (pools->taskQueue.front == pools->taskQueue.rear) {// 如果已經(jīng)沒有剩余任務(wù)要處理,那么退出線程if (pools->taskSum == 0) {printf("Thread %ld exits.\n", gettid());sem_post(&pools->sem_mutex);pthread_exit(NULL);}// 否則等待任務(wù)隊(duì)列中有任務(wù)后再取任務(wù)進(jìn)行執(zhí)行printf("Thread %ld is waiting for a task.\n", gettid());sleep(1); }// 剩余任務(wù)總數(shù)減一pools->taskSum--;// 獲取任務(wù)隊(duì)列隊(duì)首的任務(wù)Task task;int front = pools->taskQueue.front;task.function = pools->taskQueue.tasks[front].function;task.arg = pools->taskQueue.tasks[front].arg;// 循環(huán)隊(duì)列隊(duì)首下標(biāo)加一pools->taskQueue.front = (front + 1) % TASK_QUEUE_MAX_SIZE;// 將互斥鎖信號(hào)量加一,允許其它線程執(zhí)行sem_post(&pools->sem_mutex);// 執(zhí)行任務(wù)(*(task.function))(task.arg);} }// 初始化線程池 void initThreadpools(Threadpools *pools) {int ret;// 任務(wù)隊(duì)列的隊(duì)首和隊(duì)尾的坐標(biāo)都為0pools->taskQueue.front = 0;pools->taskQueue.rear = 0;// 線程池中剩余的任務(wù)總數(shù)設(shè)置為總?cè)蝿?wù)數(shù)pools->taskSum = TASK_NUM;// 初始化互斥信號(hào)量為1ret = sem_init(&pools->sem_mutex, 1, 1);if (ret == -1) {perror("sem_init-mutex");exit(1);}// 創(chuàng)建線程池中的線程for(int i = 0; i < THREADS_NUM; ++i) {ret = pthread_create(&pools->threads[i], NULL, executeTask, (void *)pools);if(ret != 0) {fprintf(stderr, "pthread_create error: %s\n", strerror(ret));exit(1);}} }// 向任務(wù)隊(duì)列中添加任務(wù) void addTask(Threadpools *pools, void *(*function)(void *arg), void *arg) {// 當(dāng)任務(wù)隊(duì)列為滿時(shí),等待有任務(wù)被取出任務(wù)隊(duì)列不為滿再加入隊(duì)列while ((pools->taskQueue.rear + TASK_QUEUE_MAX_SIZE + 1 - pools->taskQueue.front) % TASK_QUEUE_MAX_SIZE == 0) {printf("Task %d is waiting to be added to the task queue.\n", *(int *)arg);sleep(1);}// 向任務(wù)隊(duì)列的隊(duì)尾加入任務(wù)Task task;task.function = function;task.arg = arg;int rear = pools->taskQueue.rear;pools->taskQueue.tasks[rear] = task;// 任務(wù)隊(duì)列隊(duì)尾下標(biāo)加一pools->taskQueue.rear = (rear + 1) % (TASK_QUEUE_MAX_SIZE); }// 任務(wù)函數(shù) void *taskFunction(void *arg) {// 獲取每個(gè)任務(wù)的任務(wù)號(hào)int *numptr = (int *)arg;int taskId = *numptr;// 打印線程池中的哪個(gè)線程正在處理此任務(wù)printf("Thread tid = %ld is dealing with task %d\n", gettid(), taskId);// 每個(gè)任務(wù)休眠1s后繼續(xù)執(zhí)行printf("Task %d is sleeping for 1s.\n", taskId);sleep(1);// 打印任務(wù)完成信息和線程被復(fù)用printf("\t\t\t\tTask %d is finished and Thread tid = %ld is reused\n", taskId, gettid());return 0; }int main() {int ret;// 創(chuàng)建并初始化線程池Threadpools pools;initThreadpools(&pools);// 傳入?yún)?shù)數(shù)組int num[TASK_NUM];for(int i = 0; i < TASK_NUM; ++i) {num[i] = i + 1;}// 向任務(wù)隊(duì)列中連續(xù)添加任務(wù)for(int i = 0; i < TASK_NUM; ++i) {addTask(&pools, taskFunction, (void *)&num[i]);}// 主線程等待線程池中的線程全部結(jié)束后再繼續(xù)for(int i = 0; i < THREADS_NUM; ++i) {ret = pthread_join(pools.threads[i], NULL);if(ret != 0) {fprintf(stderr, "pthread_join error: %s\n", strerror(ret));exit(1);}}// 所有任務(wù)都執(zhí)行完,線程池也退出printf("\nAll %d tasks have been finished.\n", TASK_NUM);// 銷毀互斥信號(hào)量ret = sem_destroy(&pools.sem_mutex);if (ret == -1) {perror("sem_destroy sem_mutex");} }首先進(jìn)行宏定義:
#define THREADS_NUM 10 // 線程池中的線程個(gè)數(shù) #define TASK_QUEUE_MAX_SIZE 12 // 任務(wù)的等待隊(duì)列的最大長度,等待隊(duì)列中的最大任務(wù)個(gè)數(shù)為長度減一 #define TASK_NUM 100 // 要執(zhí)行的任務(wù)總數(shù)然后定義使用到的數(shù)據(jù)結(jié)構(gòu):
任務(wù):
// 線程池中每個(gè)線程執(zhí)行的任務(wù)的結(jié)構(gòu)體 typedef struct {void *(*function)(void *); // 執(zhí)行函數(shù)void *arg; // 參數(shù) } Task;任務(wù)隊(duì)列和線程池:
// 任務(wù)循環(huán)隊(duì)列的數(shù)據(jù)結(jié)構(gòu) typedef struct {Task tasks[TASK_QUEUE_MAX_SIZE]; // 任務(wù)隊(duì)列數(shù)組int front; // 隊(duì)首下標(biāo)int rear; // 隊(duì)尾下標(biāo) } TaskQueue;// 線程池?cái)?shù)據(jù)結(jié)構(gòu) typedef struct {pthread_t threads[THREADS_NUM]; // 線程數(shù)組TaskQueue taskQueue; // 任務(wù)隊(duì)列int taskSum; // 剩余任務(wù)總數(shù),結(jié)束程序用sem_t sem_mutex; // 互斥信號(hào)量 } Threadpools;線程池初始化函數(shù):
// 初始化線程池 void initThreadpools(Threadpools *pools) {int ret;// 任務(wù)隊(duì)列的隊(duì)首和隊(duì)尾的坐標(biāo)都為0pools->taskQueue.front = 0;pools->taskQueue.rear = 0;// 線程池中剩余的任務(wù)總數(shù)設(shè)置為總?cè)蝿?wù)數(shù)pools->taskSum = TASK_NUM;// 初始化互斥信號(hào)量為1ret = sem_init(&pools->sem_mutex, 1, 1);if (ret == -1) {perror("sem_init-mutex");exit(1);}// 創(chuàng)建線程池中的線程for(int i = 0; i < THREADS_NUM; ++i) {ret = pthread_create(&pools->threads[i], NULL, executeTask, (void *)pools);if(ret != 0) {fprintf(stderr, "pthread_create error: %s\n", strerror(ret));exit(1);}} }創(chuàng)建線程池中的線程時(shí),可以看到每個(gè)線程執(zhí)行的函數(shù)都為executeTask()任務(wù)執(zhí)行函數(shù)。
對應(yīng)設(shè)計(jì)圖中的初始化線程池部分:
接著實(shí)現(xiàn)函數(shù)部分:
線程執(zhí)行函數(shù):
// 線程池中每個(gè)線程執(zhí)行的任務(wù) static void *executeTask(void *arg) {// 向每個(gè)線程傳入的參數(shù)是線程池Threadpools *pools = (Threadpools *)arg;while (1) {// 等待互斥信號(hào)量大于0,防止臨界沖突,然后將互斥鎖信號(hào)量減一,繼續(xù)執(zhí)行sem_wait(&pools->sem_mutex);// 當(dāng)任務(wù)隊(duì)列為空時(shí)while (pools->taskQueue.front == pools->taskQueue.rear) {// 如果已經(jīng)沒有剩余任務(wù)要處理,那么退出線程if (pools->taskSum == 0) {printf("Thread %ld exits.\n", gettid());sem_post(&pools->sem_mutex);pthread_exit(NULL);}// 否則等待任務(wù)隊(duì)列中有任務(wù)后再取任務(wù)進(jìn)行執(zhí)行printf("Thread %ld is waiting for a task.\n", gettid());sleep(1); }// 剩余任務(wù)總數(shù)減一pools->taskSum--;// 獲取任務(wù)隊(duì)列隊(duì)首的任務(wù)Task task;int front = pools->taskQueue.front;task.function = pools->taskQueue.tasks[front].function;task.arg = pools->taskQueue.tasks[front].arg;// 循環(huán)隊(duì)列隊(duì)首下標(biāo)加一pools->taskQueue.front = (front + 1) % TASK_QUEUE_MAX_SIZE;// 將互斥鎖信號(hào)量加一,允許其它線程執(zhí)行sem_post(&pools->sem_mutex);// 執(zhí)行任務(wù)(*(task.function))(task.arg);} }當(dāng)線程從任務(wù)隊(duì)列中獲取任務(wù)執(zhí)行時(shí),有可能發(fā)生條件競爭,多個(gè)線程同時(shí)取同一個(gè)任務(wù)進(jìn)行執(zhí)行,所以要在線程執(zhí)行函數(shù)處用信號(hào)量避免這種沖突,使線程取任務(wù)執(zhí)行有序進(jìn)行。
可以看到,每個(gè)線程執(zhí)行完任務(wù)后,若還有剩余任務(wù)且任務(wù)隊(duì)列不為空,線程會(huì)自動(dòng)從任務(wù)隊(duì)列中獲取任務(wù),繼續(xù)執(zhí)行任務(wù),而不用手動(dòng)為每一個(gè)任務(wù)指定一個(gè)空閑線程進(jìn)行執(zhí)行,任務(wù)隊(duì)列為循環(huán)隊(duì)列,每次從任務(wù)隊(duì)列的隊(duì)首獲取任務(wù),保證了FIFO。
對應(yīng)設(shè)計(jì)圖中的每個(gè)線程獲取任務(wù)的箭頭部分:
將任務(wù)添加到任務(wù)隊(duì)列函數(shù):
// 向任務(wù)隊(duì)列中添加任務(wù) void addTask(Threadpools *pools, void *(*function)(void *arg), void *arg) {// 當(dāng)任務(wù)隊(duì)列為滿時(shí),等待有任務(wù)被取出任務(wù)隊(duì)列不為滿再加入隊(duì)列while ((pools->taskQueue.rear + TASK_QUEUE_MAX_SIZE + 1 - pools->taskQueue.front) % TASK_QUEUE_MAX_SIZE == 0) {printf("Task %d is waiting to be added to the task queue.\n", *(int *)arg);sleep(1);}// 向任務(wù)隊(duì)列的隊(duì)尾加入任務(wù)Task task;task.function = function;task.arg = arg;int rear = pools->taskQueue.rear;pools->taskQueue.tasks[rear] = task;// 任務(wù)隊(duì)列隊(duì)尾下標(biāo)加一pools->taskQueue.rear = (rear + 1) % (TASK_QUEUE_MAX_SIZE); }可以看到,任務(wù)隊(duì)列為循環(huán)隊(duì)列,每次向任務(wù)隊(duì)列的隊(duì)尾添加任務(wù),保證了FIFO。
對應(yīng)設(shè)計(jì)圖中的將任務(wù)添加到任務(wù)隊(duì)列的箭頭部分:
每個(gè)任務(wù)執(zhí)行的函數(shù):
// 任務(wù)函數(shù) void *taskFunction(void *arg) {// 獲取每個(gè)任務(wù)的任務(wù)號(hào)int *numptr = (int *)arg;int taskId = *numptr;// 打印線程池中的哪個(gè)線程正在處理此任務(wù)printf("Thread tid = %ld is dealing with task %d\n", gettid(), taskId);// 每個(gè)任務(wù)休眠1s后繼續(xù)執(zhí)行printf("Task %d is sleeping for 1s.\n", taskId);sleep(1);// 打印任務(wù)完成信息和線程被復(fù)用printf("\t\t\t\tTask %d is finished and Thread tid = %ld is reused\n", taskId, gettid());return 0; }對應(yīng)設(shè)計(jì)圖中的每個(gè)任務(wù)執(zhí)行的內(nèi)容部分:
主函數(shù)中:
int main() {int ret;// 創(chuàng)建并初始化線程池Threadpools pools;initThreadpools(&pools);// 傳入?yún)?shù)數(shù)組int num[TASK_NUM];for(int i = 0; i < TASK_NUM; ++i) {num[i] = i + 1;}// 向任務(wù)隊(duì)列中連續(xù)添加任務(wù)for(int i = 0; i < TASK_NUM; ++i) {addTask(&pools, taskFunction, (void *)&num[i]);}// 主線程等待線程池中的線程全部結(jié)束后再繼續(xù)for(int i = 0; i < THREADS_NUM; ++i) {ret = pthread_join(pools.threads[i], NULL);if(ret != 0) {fprintf(stderr, "pthread_join error: %s\n", strerror(ret));exit(1);}}// 所有任務(wù)都執(zhí)行完,線程池也退出printf("\nAll %d tasks have been finished.\n", TASK_NUM);// 銷毀互斥信號(hào)量ret = sem_destroy(&pools.sem_mutex);if (ret == -1) {perror("sem_destroy sem_mutex");} }主函數(shù)中,先創(chuàng)建線程池,此時(shí)線程處在等待狀態(tài),然后再添加任務(wù),線程池中的線程執(zhí)行完所有的任務(wù)后,再退出程序。
執(zhí)行命令:
gcc threadpools.c -pthread ./a.out分析:
可以看到,一開始當(dāng)任務(wù)隊(duì)列中還沒有任務(wù)時(shí),線程池中的線程會(huì)等待任務(wù)隊(duì)列中有任務(wù)后再取出任務(wù)接著執(zhí)行。
可以看到,每個(gè)線程按照FIFO從任務(wù)隊(duì)列中取出任務(wù)進(jìn)行執(zhí)行,每個(gè)任務(wù)會(huì)休眠1s,如果任務(wù)隊(duì)列已滿,新的任務(wù)會(huì)等待任務(wù)隊(duì)列有任務(wù)被取出后再加入任務(wù)隊(duì)列。
可以看到,任務(wù)執(zhí)行完成之后,線程池中的線程會(huì)被復(fù)用,同一個(gè)tid的線程會(huì)自動(dòng)從任務(wù)隊(duì)列中獲取任務(wù),可以執(zhí)行不同的任務(wù)。
可以看到,當(dāng)所有的任務(wù)都被執(zhí)行完后,線程池中所有線程退出,回到主線程之后繼續(xù),程序正常退出。
測試用例:
在宏定義處,改變線程池中的線程個(gè)數(shù),任務(wù)隊(duì)列的最大長度和要執(zhí)行的認(rèn)為總數(shù),可以進(jìn)行測試程序:
測試用例1:
線程個(gè)數(shù)為10,任務(wù)隊(duì)列最大長度為12(最大任務(wù)個(gè)數(shù)為11),任務(wù)總數(shù)為50:
#define THREADS_NUM 10 // 線程池中的線程個(gè)數(shù) #define TASK_QUEUE_MAX_SIZE 12 // 任務(wù)的等待隊(duì)列的最大長度,等待隊(duì)列中的最大任務(wù)個(gè)數(shù)為長度減一 #define TASK_NUM 50 // 要執(zhí)行的任務(wù)總數(shù)執(zhí)行截圖:
任務(wù)總數(shù)稍大于線程個(gè)數(shù)和任務(wù)隊(duì)列長度時(shí),可以看到,線程池可以正常運(yùn)行。
測試用例2:
線程個(gè)數(shù)為10,任務(wù)隊(duì)列最大長度為12(最大任務(wù)個(gè)數(shù)為11),任務(wù)總數(shù)為5:
#define THREADS_NUM 10 // 線程池中的線程個(gè)數(shù) #define TASK_QUEUE_MAX_SIZE 12 // 任務(wù)的等待隊(duì)列的最大長度,等待隊(duì)列中的最大任務(wù)個(gè)數(shù)為長度減一 #define TASK_NUM 5 // 要執(zhí)行的任務(wù)總數(shù)執(zhí)行截圖:
任務(wù)總數(shù)小于線程個(gè)數(shù)和任務(wù)隊(duì)列長度時(shí),可以看到,線程池可以正常運(yùn)行。
測試用例3:
線程個(gè)數(shù)為10,任務(wù)隊(duì)列最大長度為12(最大任務(wù)個(gè)數(shù)為11),任務(wù)總數(shù)為10000:
#define THREADS_NUM 10 // 線程池中的線程個(gè)數(shù) #define TASK_QUEUE_MAX_SIZE 12 // 任務(wù)的等待隊(duì)列的最大長度,等待隊(duì)列中的最大任務(wù)個(gè)數(shù)為長度減一 #define TASK_NUM 10000 // 要執(zhí)行的任務(wù)總數(shù)執(zhí)行截圖:
當(dāng)任務(wù)總數(shù)遠(yuǎn)遠(yuǎn)多于線程個(gè)數(shù)時(shí),線程池可以正常運(yùn)行。
測試用例4:
線程個(gè)數(shù)為500,任務(wù)隊(duì)列最大長度為500(最大任務(wù)個(gè)數(shù)為499),任務(wù)總數(shù)為10000:
#define THREADS_NUM 500 // 線程池中的線程個(gè)數(shù) #define TASK_QUEUE_MAX_SIZE 500 // 任務(wù)的等待隊(duì)列的最大長度,等待隊(duì)列中的最大任務(wù)個(gè)數(shù)為長度減一 #define TASK_NUM 10000 // 要執(zhí)行的任務(wù)總數(shù)執(zhí)行截圖:
可以看到,當(dāng)線程個(gè)數(shù)較多時(shí),線程池可以正常運(yùn)行,由于使用了信號(hào)量,所以多個(gè)線程在獲取任務(wù)時(shí)不會(huì)發(fā)生條件競爭,導(dǎo)致沖突
使用之前沒有用信號(hào)量的程序進(jìn)行運(yùn)行相同參數(shù)的程序時(shí),可以看到,由于發(fā)生條件競爭,出現(xiàn)了無限阻塞現(xiàn)象,線程之間獲取任務(wù)時(shí)有沖突。
測試用例5:
線程個(gè)數(shù)為3000,任務(wù)隊(duì)列最大長度為4000(最大任務(wù)個(gè)數(shù)為3999),任務(wù)總數(shù)為500000:
#define THREADS_NUM 3000 // 線程池中的線程個(gè)數(shù) #define TASK_QUEUE_MAX_SIZE 4000 // 任務(wù)的等待隊(duì)列的最大長度,等待隊(duì)列中的最大任務(wù)個(gè)數(shù)為長度減一 #define TASK_NUM 500000 // 要執(zhí)行的任務(wù)總數(shù)執(zhí)行截圖:
進(jìn)行多線程高并發(fā)測試,可以看到,線程池可以正常運(yùn)行。
總結(jié)
以上是生活随笔為你收集整理的操作系统实验报告15:进程同步与互斥线程池的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 操作系统实验报告14:Peterson
- 下一篇: java信息管理系统总结_java实现科