日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > linux >内容正文

linux

Linux多线程——生产者消费者模型

發布時間:2024/3/12 linux 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Linux多线程——生产者消费者模型 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

一.生產者消費者模型

? ? ? ? 1.1 什么是生成者消費者模型

? ? ? ? 1.2 生產者消費者模型的優點

? ? ? ? 1.3 基于阻塞隊列實現生產者消費者模型

? ? ? ? ?1.4 POSIX信號量

? ? ? ? 1.4.1 信號量概念

? ? ? ? 1.4.2 P操作和V操作

? ? ? ? 1.4.3 理解信號量

? ? ? ? 1.4.4 信號量的函數

? ? ? ? ?1.4.5 基于環形隊列實現生產者消費者模型


一.生產者消費者模型

? ? ? ? 1.1 什么是生成者消費者模型

? ? ? ? 一個進程中的線程有兩種角色,一種是生產者,一種是消費者。生產者為消費者提供任務,消費者拿到任務,解決任務。在生成者和消費者之間還有一個"交易場所",是一個內存塊。生成者線程將任務放到內存塊中,消費者線程在內存塊中拿任務。當內存塊數據達到一高水位線時,生產者會進行等待,喚醒消費者拿任務,當內存塊數據達到一低水位線時,消費者會等待,并且喚醒生產者生產任務。

? ? ? ? 生成者,消費者存在著3種關系。生產者和生產者之間是互斥的關系,消費者和消費者之間是互斥的關系,生產者和消費者之間是互斥和同步的關系。

? ? ? ? 1.2 生產者消費者模型的優點

? ? ? ? 例如一個正常的函數,不使用生產者消費者模型:

? ? ? ?上面是單線程的情況,即使是多線程,不使用生產者消費者模型,生產者直接給消費者送數據,整個進程的效率會是最慢的線程的效率。并且只能生產一個數據,消費一個數據。兩者還是串行的,耦合度高。

????????使用生產者消費者模型:

生產者和消費者模型的優點:

  • 實現線程的解耦
  • 支持線程之間并行運行
  • 效率高
  • 方便代碼維護

? ? ? ? 1.3 基于阻塞隊列實現生產者消費者模型

? ? ? ? 在多線程編程中,阻塞隊列是一種常用于實現生產者和消費者模型的數據結構。其普通隊列區別在于,當隊列為空時,從隊列獲取元素的操作將會被阻塞,直到隊列中被放入元素,當隊列滿的時候,往隊列中存放元素的操作也會被阻塞,直到有元素從隊列中取出。

生產者和生產者之間互斥,消費者和消費者之間互斥

? ? 在生產和消費的時候需要定義兩個互斥量,一個是生產者之間的,一個是消費者之間的。

生產者和消費者之間互斥且同步

? ? 定義一個互斥量,取數據的時候,不能放,放數據的時候,不能取

? ?有兩個條件,滿和空,定義兩個條件變量

實現的是多生產者,多消費者的模型。

注意:編碼時,要先環境再等待,不然能就喚醒不了了。

#pragma once #include<iostream> #include<queue> #include<pthread.h> //隊列元素個刷 #define NUM 5 //任務 struct Task{Task(){};Task(int x,int y):_x(x),_y(y){}int _x;int _y;int Add(){return _x + _y;} }; //提供兩個接口,放任務,和拿任務 class blockqueue{ private://加鎖void MakeLock(){pthread_mutex_lock(&_lock);}//取消鎖void CancelLock(){pthread_mutex_unlock(&_lock);}//喚醒消費者void WakeUpConsumer(){std::cout<<"Consumer wake up"<<std::endl;pthread_cond_signal(&_empty);}//喚醒生產者void WakeUpProductor(){std::cout<<"Productor wake up"<<std::endl;pthread_cond_signal(&_full);}//生產者等待void SleepProductor(){std::cout<<"Productor sleep"<<std::endl;pthread_cond_wait(&_full, &_lock);}//消費者等待void SleepConsumer(){std::cout<<"Consumer sleep"<<std::endl;pthread_cond_wait(&_empty, &_lock);}public:blockqueue(size_t cap = NUM):_cap(cap){pthread_mutex_init(&_lock, nullptr);pthread_cond_init(&_full, nullptr);pthread_cond_init(&_empty, nullptr);}~blockqueue(){pthread_cond_destroy(&_empty);pthread_cond_destroy(&_full);pthread_mutex_destroy(&_lock);}//放數據void Put(Task in){//q隊列是臨界資源,需要加鎖MakeLock();//需要使用循環while(_q.size()>=_cap){//先喚醒再等待WakeUpConsumer();SleepProductor();}_q.push(in);CancelLock();}//獲取數據void Get(Task& out){MakeLock();while(_q.empty()){WakeUpProductor();SleepConsumer();}out=_q.front();_q.pop();CancelLock();}private:std::queue<Task> _q;size_t _cap;pthread_mutex_t _lock;pthread_cond_t _empty;//消費者在此等待pthread_cond_t _full;//生產者在此等待};#include"BlockQueue.hpp" #include<unistd.h>#define PRO 2 #define CON 2 using namespace std; //定義兩個互斥量,生產者和消費者之間要互相競爭鎖 //決定哪個線程進來 pthread_mutex_t mutex1; pthread_mutex_t mutex2;void *Productor(void *arg){sleep(1);blockqueue *q = (blockqueue *)arg;while(true){sleep(1);int x=rand()%9+1;int y=rand()%20+1;Task t(x,y);//阻塞隊列是共享資源,需要上鎖pthread_mutex_lock(&mutex2);q->Put(t);cout<<pthread_self()<<":"<<x<<"+"<<y<<"="<<"?"<<endl;pthread_mutex_unlock(&mutex2);} } void *Consumer(void *arg){blockqueue *q = (blockqueue *)arg;while(true){sleep(1);Task t;//阻塞隊列是共享資源,需要上鎖pthread_mutex_lock(&mutex1);q->Get(t);cout<<pthread_self()<<":"<<t._x<<"+"<<t._y<<"="<<t.Add()<<endl;pthread_mutex_unlock(&mutex1);}}int main(){pthread_mutex_init(&mutex2,nullptr);pthread_mutex_init(&mutex1,nullptr);blockqueue *bq = new blockqueue();//生產者線程pthread_t td1[PRO];int i=0;for(; i<PRO; i++){pthread_create(td1+i, nullptr, Productor, (void *)bq);}//消費者線程pthread_t td2[CON];for(i=0; i<CON; i++){pthread_create(td2+i, nullptr, Consumer, (void *)bq);} for(i=0; i<PRO; i++){pthread_join(td1[i], nullptr);}for(i=0; i<CON; i++){pthread_join(td2[i], nullptr);}pthread_mutex_destroy(&mutex2);pthread_mutex_destroy(&mutex1);delete bq;return 0; }

?演示:

? ? ? ? ?1.4 POSIX信號量

? ? ? ? 1.4.1 信號量概念

? ? ? ? 有一種情況,我們可以將臨界資源分成若干份,一個線程只會使用臨界資源中的一份。

? ? ? ? 這個時候就有了信號量,信號量本質是一個計數器,描述的是臨界資源的有效個數。

? ? ? ? 1.4.2 P操作和V操作

? ? ? ? 假如:臨界資源可以分成5個部分,記為count=5。count就被稱作信號量。

? ? ? ? count--,一個執行流占有一個部分的操作叫做P操作。

? ? ? ? count++,一個執行流結束使用臨界資源的一部風叫做V操作。

????????當信號量count==0時,如果進行P操作,沒有信號量可以分配了,此時會阻塞等待。

????????由于信號量每一個線程看到的是同一份資源,信號量也是臨界資源,要保證P,V操作是原子的。

????????二元信號量相當于互斥鎖:?二元信號量只有1個信號量,只要一個線程占有,信號量的值就等于0,其它線程就需要等待。

? ? ? ? 1.4.3 理解信號量

? ? ? ? OS中會有很多的信號量,OS系統需要對它們進行管理,管理需要進行描述:

信號量可以描述為:

struct sem{

......

int count;//臨界資源有效個數

mutex lock;//只允許一個線程對臨界資源進行操作,需要上鎖

wait_queue *head;//等待隊列

......

}

? ? ? ? 1.4.4 信號量的函數

  • 初始化
#include <semaphore.h>int sem_init(sem_t *sem, int pshared, unsigned int value); 作用:初始化信號量 參數:sem,要初始化的信號量pshared:0表示線程間共享,非0表示進程間共享value:信號量初始值,信號量個數
  • 銷毀信號量
#include <semaphore.h>int sem_destroy(sem_t *sem); 作用:銷毀定義的信號量 參數:sem:要銷毀的信號量
  • 等待信號量,P操作
#include <semaphore.h>int sem_wait(sem_t *sem); 作用:等待信號量,將信號量的值減1,如果信號量為0,阻塞等待 參數:sem:要等待的信號量

  • 發布信號,V操作
#include <semaphore.h> int sem_post(sem_t *sem);作用:表示資源使用完畢,將信號量做加1操作 參數:sem:要發布的信號量

? ? ? ? ?1.4.5 基于環形隊列實現生產者消費者模型

  • 環形隊列采用數組模擬,用模運算來模擬環形特征
  • 當隊列滿了或者隊列為空時,都是消費者的下標和生產者的下標相同。不好判斷為空和滿的情況。
    • 有兩種方法:
    • 1.少用一個元素空間,這個時候為空時,下標相等,為滿時,生產者下標加1在取模等于消費者下標。
    • 2.增加一個計數器,來記錄元素個數。
  • 我們這里正好有信號量這個計數器,隊列里的每一個位置代表一個信號量。正好信號量就是這個計數器。

定義兩個信號量,一個信號量表示空格字space_sem,一個信號量表示數據_data_sem。

生產者:放元素,關注的說空格子這個信號量。

偽代碼:

? ? ? ? P(space_sem)

? ? ? ? 生產數據

? ? ? ? V(data_sem)

消費者:拿元素,關注的是數據這個信號量。?

偽代碼:

? ? ? ? P(data_sem)

? ? ? ? 生產數據

? ? ? ? V(space_sem)

????????執行到同一位置時,為空或者滿,此時要不就是space_sem為臨界資源總有效個數,data_sem為0,要不就是data_sem為臨界資源總有效個數,space_sem為0。這個時候,放數據和拿數據總會有一個在等待(P操作)。

? ? ? ? 當生產者快,消費者慢時,一開始生產者將數據放滿,在消費者消費一個,在生產者生產一個。隊列經常是滿的。

????????當生產者,消費者快時,一開始沒數據,需要生產者生產,在消費一個,現象時生產一個消費一個,隊列經常是空的。

多消費者多生產者:

#pragma once #include<iostream> #include<vector> #include<semaphore.h> #include<pthread.h> #define NUM 5class RingQueue{private:void P(sem_t& s){//信號量減減操作,如果為0等待sem_wait(&s);}void V(sem_t& s){//信號量加加操作sem_post(&s);}public:RingQueue(size_t cap = NUM):_v(cap),_cap(cap),_cindex(0),_pindex(0){sem_init(&_space_sem, 0, cap);sem_init(&_data_sem, 0, 0);}void Put(const int& in){//生產者關注格子數P(_space_sem);_v[_pindex]=in;_pindex++;_pindex %= _cap;V(_data_sem);}void Get(int& out){//消費者關注數據P(_data_sem);out = _v[_cindex];_cindex++;_cindex %= _cap;V(_space_sem);}~RingQueue(){sem_destroy(&_space_sem);sem_destroy(&_data_sem);_cindex = 0;_pindex = 0;}private:std::vector<int> _v;//隊列size_t _cap;//隊列容量sem_t _space_sem;//格子信號量sem_t _data_sem;//數據信號量int _cindex;//消費者位置int _pindex;//生產者位置}; #include"RingQueue.hpp" #include<unistd.h>using namespace std;#define CON 4 #define PRO 4pthread_mutex_t mutex1; pthread_mutex_t mutex2; void *consumer(void *arg){RingQueue *rq=(RingQueue *)arg;while(1){sleep(1);int x=0;pthread_mutex_lock(&mutex1);rq->Get(x);pthread_mutex_unlock(&mutex1);cout<<pthread_self()<<":"<<"consumer get a data :"<<x<<endl;} }void *productor(void *arg){RingQueue *rq=(RingQueue *)arg;while(1){//sleep(1);int x=rand()%10+1;pthread_mutex_lock(&mutex2);rq->Put(x);pthread_mutex_unlock(&mutex2);cout<<pthread_self()<<":"<<"productor put a data :"<<x<<endl;} } int main(){RingQueue *rq = new RingQueue();pthread_t td1[CON];pthread_t td2[PRO];int i=0;for(; i<CON; i++){pthread_create(td1+i, nullptr, consumer, (void *)rq);}for(i=0; i<PRO; i++){pthread_create(td2+i, nullptr, productor, (void *)rq);}for(i=0; i<CON; i++){pthread_join(td1[i], nullptr);}for(i=0; i<PRO; i++){pthread_join(td2[i], nullptr);}delete rq;return 0; }

總結

以上是生活随笔為你收集整理的Linux多线程——生产者消费者模型的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。