日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

生产者消费者模型详解

發(fā)布時間:2024/3/12 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 生产者消费者模型详解 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

生產(chǎn)者消費(fèi)者模型

文章目錄

  • 生產(chǎn)者消費(fèi)者模型
    • 什么是生產(chǎn)者消費(fèi)者模型
    • 基于BlockingQueue的生產(chǎn)者消費(fèi)者模型
      • 單生產(chǎn)者單消費(fèi)者模型
      • 多生產(chǎn)者多消費(fèi)者模型

什么是生產(chǎn)者消費(fèi)者模型

生產(chǎn)者消費(fèi)者模式就是通過一個容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取,阻塞隊列就相當(dāng)于一個緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。這個阻塞隊列就是用來給生產(chǎn)者和消費(fèi)者解耦的。

我們首先舉個例子便于理解生產(chǎn)者消費(fèi)者模型:

我們平常都會去超市買一些需要的物品,給我們提供產(chǎn)品的是供應(yīng)商,我們是消費(fèi)者,我們是去超市消費(fèi),給我們提供產(chǎn)品的而是供應(yīng)商,為什么我們會去超市呢?因為超市給我們提供了交易場所,生產(chǎn)者消費(fèi)者模型本質(zhì)是生產(chǎn)者把產(chǎn)品放到超市里面,消費(fèi)者把產(chǎn)品從超市中拿出去。

計算機(jī)中的數(shù)據(jù)相當(dāng)于產(chǎn)品,數(shù)據(jù)的加工處理是CPU以任務(wù)形式加工處理,生產(chǎn)者和消費(fèi)者代表的是多線程或者多進(jìn)程,超市相當(dāng)于一段內(nèi)存

生活中生產(chǎn)者將產(chǎn)品放在一個交易場所,讓我們自己去拿,為什么這樣呢?是因為效率高,比如菜鳥驛站的場景,快遞公司把一批包裹放進(jìn)菜鳥驛站,然后我們?nèi)ツ镁秃昧?#xff0c;提高了效率

廠商將交易場所當(dāng)作緩沖區(qū)來用,生產(chǎn)者和消費(fèi)者一般是進(jìn)程和線程,空間或者交易場所:一塊"內(nèi)存塊",產(chǎn)品相當(dāng)于數(shù)據(jù)

生產(chǎn)者消費(fèi)者321原則(自己總結(jié):3種關(guān)系,2種角色。一個交易場所):

3種關(guān)系,生產(chǎn)者和生產(chǎn)者,生產(chǎn)者和消費(fèi)者,消費(fèi)者和消費(fèi)者

生產(chǎn)者和生產(chǎn)者競爭關(guān)系(互斥),生產(chǎn)者和消費(fèi)者(同步),消費(fèi)者和消費(fèi)者競爭關(guān)系(互斥)

2種角色,生產(chǎn)者和消費(fèi)者

1個交易場所:一塊"內(nèi)存塊"

int add(int a,int b) {return a+b; } int main() {int a = 10;int b = 20;int c = add(a,b); }

方式一:

方式二:

第一種方式main函數(shù)生產(chǎn)a和b后調(diào)用add,main函數(shù)需要等待add函數(shù)執(zhí)行完才能繼續(xù)工作,這種稱為強(qiáng)耦合,這種工作方式是串行,這種方式耦合度高,而第二種方式是線程1去執(zhí)行main函數(shù)生成a和b,線程2去執(zhí)行add函數(shù),它們直接有一塊區(qū)域來給線程1放數(shù)據(jù)和給線程2取數(shù)據(jù),當(dāng)線程2在add的時候,線程1也可以放數(shù)據(jù),這種方式稱為并行,也稱之為解耦

并行是一種需求,表示有很多業(yè)務(wù)活動同時進(jìn)行。異步是一種代碼編寫方式,同步和串行:同步是一種代碼編寫方式,串行是一種需求

基于BlockingQueue的生產(chǎn)者消費(fèi)者模型

阻塞隊列有什么用?

如果不把數(shù)據(jù)往阻塞隊列里面放,假設(shè)數(shù)據(jù)通過網(wǎng)絡(luò)給服務(wù)器用了10ms,服務(wù)器讓線程1把數(shù)據(jù)寫入數(shù)據(jù)庫用了5ms,一共15ms,用戶一共等待15ms,如果把數(shù)據(jù)往阻塞隊列里面放,因為阻塞隊列是在內(nèi)存當(dāng)中的,而數(shù)據(jù)庫是文件是需要進(jìn)行IO的,放在阻塞隊列的時間要比放在數(shù)據(jù)庫的時間少一些,所以用戶等待的時間就少一些。

下面我們寫一個單生產(chǎn)者單消費(fèi)者模型:

單生產(chǎn)者單消費(fèi)者模型

Makefile的編寫:

main:main.ccg++ $^ -o $@ -lpthread .PHONY:clean clean:rm -f main

BlockQueue.hpp

#ifndef __BLOCK_QUEUE_H__ #define __BLOCK_QUEUE_H__ #include<iostream> #include<queue> #include<pthread.h> #include<unistd.h> class BlockQueue { private:std::queue<int> q;size_t _cap;pthread_mutex_t lock;pthread_cond_t c_cond;//消費(fèi)者的條件不滿足時,將來消費(fèi)者在該條件變量下等pthread_cond_t p_cond;//生產(chǎn)者的條件不滿足時,將來生產(chǎn)者在該條件下等 public:bool IsFull(){return q.size() >= _cap;}bool IsEmpty(){return q.empty();}void LockQueue(){pthread_mutex_lock(&lock);}void UnLockQueue(){pthread_mutex_unlock(&lock);}void WakeUpComsumer(){pthread_cond_signal(&c_cond);}void WakeUpProductor(){pthread_cond_signal(&p_cond);}void ProducterWait(){pthread_cond_wait(&p_cond,&lock);//這里為什么要傳鎖,我們在等待時肯定是條件不滿足了,我們通過判斷才知道條件滿不滿足,//判斷就需要保證進(jìn)入臨界區(qū),我們是持有鎖進(jìn)入的,wait的時候必須要釋放鎖//在調(diào)用該函數(shù)的時候,自動會釋放lock//當(dāng)該函數(shù)被返回時,返回到了臨界區(qū)內(nèi),所以,該函數(shù)會讓該線程重新持有該鎖}void ComsumerWait(){pthread_cond_wait(&c_cond,&lock);//在消費(fèi)者釋放鎖時,生產(chǎn)者正申請鎖,而消費(fèi)者在等待} public:BlockQueue(size_t cap):_cap(cap){pthread_mutex_init(&lock,nullptr);pthread_cond_init(&c_cond,nullptr);pthread_cond_init(&p_cond,nullptr);}void Put(int in){LockQueue();//if(isFull())while(isFull()){WakeUpComsumor();//喚醒消費(fèi)者ProducterWait();//生產(chǎn)者等待}q.push(in);UnLockQueue();}void Get(int& out){LockQueue();//if(IsEmpty)while(IsEmpty()){WakeUpProductor();ComsumerWait();//消費(fèi)者者等待}out = q.front();q.pop();LockQueue();}~BlockQueue(){pthread_cond_destroy(&lcok);pthread_cond_destroy(&c_cond);pthread_cond_destroy(&p_cond); } }; #endif

main.cc

#include"BlockQueue.hpp" using namespace std; void* consumer_run(void* arg) {BlockQueue *bq = (BlockQueue*)arg;while(true){int n = 0;bq->Get(n);cout<<"consumer data is : " << n <<endl;} } void* productor_run(void* arg) {BlockQueue *bq = (BlockQueue*)arg;while(true){int data = rand()%10+1;bq->Put(data);cout<<"product data is : "<<data<<endl;} } int main() {BlockQueue *bq = new BlockQueue(5);pthread_t c,p;pthread_create(&c,nullptr,consumer_run,(void*)bq);pthread_create(&p,nullptr,productor_run,(void*)bq);pthread_join(c,nullptr);pthread_join(p,nullptr);delete bq;return 0; }

上面代碼有個細(xì)節(jié)不能用if判斷滿和空,萬一被提前喚醒或者等待函數(shù)調(diào)用失敗,它們會繼續(xù)執(zhí)行push數(shù)據(jù)和pop數(shù)據(jù),此時就是非法操作了

多生產(chǎn)者多消費(fèi)者模型

上面是單生產(chǎn)者單消費(fèi)者模型,只解決了生產(chǎn)者和消費(fèi)者之間的關(guān)系,如果是多生產(chǎn)者多消費(fèi)者模型那么需要添加另外兩種關(guān)系:還有生產(chǎn)者和生產(chǎn)者的關(guān)系(互斥),以及消費(fèi)者和消費(fèi)者的關(guān)系(互斥)

所以我們只需要在外面生產(chǎn)者和消費(fèi)者的例程中放數(shù)據(jù)和取數(shù)據(jù)加鎖即可:

#include"BlockQueue.hpp" using namespace std; pthread_mutex_t c_lock; pthread_mutex_t p_lock;void* consumer_run(void* arg) {BlockQueue *bq = (BlockQueue*)arg;while(true){int n = 0;pthread_mutex_lock(&c_lock);bq->Get(n);pthread_mutex_unlock(&c_lock);cout<<"consumer data is : " << n <<endl;} } void* productor_run(void* arg) {BlockQueue *bq = (BlockQueue*)arg;while(true){pthread_mutex_lock(&p_lock);int data = rand()%10+1;bq->Put(data);pthread_mutex_unlock(&p_lock);cout<<"product data is : "<<data<<endl;} } int main() {BlockQueue *bq = new BlockQueue(5);pthread_t c,p;pthread_mutex_init(&c_lock,nullptr);pthread_mutex_init(&p_lock,nullptr);pthread_create(&c,nullptr,consumer_run,(void*)bq);pthread_create(&c,nullptr,consumer_run,(void*)bq);pthread_create(&c,nullptr,consumer_run,(void*)bq);pthread_create(&c,nullptr,consumer_run,(void*)bq);pthread_create(&p,nullptr,productor_run,(void*)bq);pthread_create(&p,nullptr,productor_run,(void*)bq);pthread_create(&p,nullptr,productor_run,(void*)bq);pthread_create(&p,nullptr,productor_run,(void*)bq);pthread_create(&p,nullptr,productor_run,(void*)bq);pthread_join(c,nullptr);pthread_join(p,nullptr);delete(bq);pthread_mutex_destroy(&c_lock);pthread_mutex_destroy(&p_lock);return 0; }

那么上面做的事情有什么用呢?下面我們來看下面這個代碼:

#ifndef __QUEUE_BLOCK_H__ #define __QUEUE_BLOCK_H__ #include<iostream> #include<queue> #include<pthread.h> #include<unistd.h> class Task { public:int _x;int _y; public:Task(){}Task(int x,int y):_x(x),_y(y){}int Run(){return _x+_y;}~Task(){} }; class BlockQueue { private:std::queue<Task> q;size_t _cap;pthread_mutex_t lock;pthread_cond_t c_cond;//消費(fèi)者的條件不滿足時,將來消費(fèi)者在該條件變量下等pthread_cond_t p_cond;//生產(chǎn)者的條件不滿足時,將來生產(chǎn)者在該條件下等 public:bool IsFull(){return q.size() >= _cap;}bool IsEmpty(){return q.empty();}void LockQueue(){pthread_mutex_lock(&lock);}void UnLockQueue(){pthread_mutex_unlock(&lock);}void WakeUpComsumer(){pthread_cond_signal(&c_cond);}void WakeUpProductor(){pthread_cond_signal(&p_cond);}void ProducterWait(){pthread_cond_wait(&p_cond,&lock);//這里為什么要傳鎖,我們在等待時肯定是條件不滿足了,我們通過判斷才知道條件滿不滿足,//判斷就需要保證進(jìn)入臨界區(qū),我們是持有鎖進(jìn)入的,wait的時候必須要釋放鎖//在調(diào)用該函數(shù)的時候,自動會釋放lock//當(dāng)該函數(shù)被返回時,返回到了臨界區(qū)內(nèi),所以,該函數(shù)會讓該線程重新持有該鎖}void ComsumerWait(){pthread_cond_wait(&c_cond,&lock);//在消費(fèi)者釋放鎖時,生產(chǎn)者正申請鎖,而消費(fèi)者在等待} public:BlockQueue(size_t cap):_cap(cap){pthread_mutex_init(&lock,nullptr);pthread_cond_init(&c_cond,nullptr);pthread_cond_init(&p_cond,nullptr);}void Put(Task t){LockQueue();//if(isFull())while(isFull()){WakeUpComsumor();//喚醒消費(fèi)者ProducterWait();//生產(chǎn)者等待}q.push(t);UnLockQueue();}void Get(Task& t){LockQueue();//if(IsEmpty)while(IsEmpty()){WakeUpProductor();ComsumerWait();//消費(fèi)者者等待}t = q.front();q.pop();UnLockQueue();}~BlockQueue(){pthread_cond_destroy(&lcok);pthread_cond_destroy(&c_cond);pthread_cond_destroy(&p_cond); } };#endif #include"BlockQueue.hpp"pthread_mutex_t c_lock; pthread_mutex_t p_lock; void* r1(void* arg) {//生產(chǎn)者BlockQueue* bq =(BlockQueue*)arg;while(true){pthread_mutex_lock(&p_lock);int x = rand()%10+1;int y = rand()%100+1;Task t(x,y);bq->Put(t);pthread_mutex_unlock(&p_lock);cout<<"Product Task is: "<<x<<'+'<<y<<"= ?"<<endl;} } void* r2(void* arg) {//消費(fèi)者BlockQueue* bq = (BlockQueue*)arg;while(true){//取數(shù)據(jù)Task t;pthread_mutex_lock(&c_lock);bq->Get(t);pthread_mutex_unlock(&c_lock);cout<<"Consumer: "<<t._x<<"+"<<t._y<<"="<<t.Run()<<endl;} } int main() {BlockQueue* bq = new BlockQueue(5);pthread_t c,p;pthread_mutex_init(&p_lock,NULL);pthread_mutex_init(&c_lock,NULL);pthread_create(&p,NULL,r1,(void*)bq);pthread_create(&p,NULL,r1,(void*)bq);pthread_create(&p,NULL,r1,(void*)bq);pthread_create(&p,NULL,r1,(void*)bq);pthread_create(&c,NULL,r2,(void*)bq);pthread_create(&c,NULL,r2,(void*)bq);pthread_create(&c,NULL,r2,(void*)bq);pthread_create(&c,NULL,r2,(void*)bq);pthread_create(&c,NULL,r2,(void*)bq);pthread_join(p,nullptr);pthread_join(c,nullptr);pthread_mutex_destroy(&c_lock);pthread_mutex_destroy(&p_lock);delete bq;return 0; }


這樣就做到了一些線程放數(shù)據(jù),另一些線程幫我們計算數(shù)據(jù),也就是完成相應(yīng)的任務(wù)。

總結(jié)

以上是生活随笔為你收集整理的生产者消费者模型详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。