并发编程之多进程3 (生产者与消费者模型) 回调函数
生產消費者模型
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。
實例1:
from multiprocessing import Process,Queue import time,random,osdef procducer(q):for i in range(10):res='包子%s' %itime.sleep(0.5)q.put(res)print('%s 生產了 %s' %(os.getpid(),res))def consumer(q):while True:res=q.get()if res is None:breakprint('%s 吃 %s' %(os.getpid(),res))time.sleep(random.randint(2,3))if __name__ == '__main__':q=Queue()p=Process(target=procducer,args=(q,))c=Process(target=consumer,args=(q,))p.start()c.start()print('主')此時的問題是主進程永遠不會結束,原因是:生產者p在生產完后就結束了,但是消費者c在取空了q之后,則一直處于死循環中且卡在q.get()這一步。解決方式無非是讓生產者在生產完畢后,往隊列中再發一個結束信號,這樣消費者在接收到結束信號后就可以break出死循環。
例子2:
from multiprocessing import Process,Queue import time,random,osdef procducer(q):for i in range(10):res='包子%s' %itime.sleep(0.5)q.put(res)print('%s 生產了 %s' %(os.getpid(),res))def consumer(q):while True:res=q.get()if res is None:breakprint('%s 吃 %s' %(os.getpid(),res))time.sleep(random.randint(2,3))if __name__ == '__main__':q=Queue()p=Process(target=procducer,args=(q,))c=Process(target=consumer,args=(q,))p.start()c.start()p.join()q.put(None)print('主')注意:以上發送可以放在生產函數中循環完進行發送,當然也可以如上放在主進程中進行發送,但是前提是必須等生產子進程結束才可以。
?========================用個小栗子來理解=================================================================
舉一個小栗子,(寄信) 1、你把信寫好——相當于生產者制造數據2、你把信放入郵筒——相當于生產者把數據放入緩沖區3、郵遞員把信從郵筒取出——相當于消費者把數據取出緩沖區4、郵遞員把信拿去郵局做相應的處理——相當于消費者處理數據優勢 緩沖區作用: 1、解耦 假設生產者和消費者分別是兩個類。如果讓生產者直接調用消費者的某個方法,那么生產者對于消費者就會產生依賴(也就是耦合)。將來如果消費者的代碼發生變化,可能會影響到生產者。而如果兩者都依賴于某個緩沖區,兩者之間不直接依賴,耦合也就相應降低了。再舉個小栗子 如果不使用郵筒(也就是緩沖區),你必須得把信直接交給郵遞員。有同學會說,直接給郵遞員不是挺簡單的嘛?
其實不簡單,你必須得認識誰是郵遞員,才能把信給他(光憑身上穿的制服,萬一有人假冒,就慘了)。這就產生和你和郵遞員之間的依賴(相當于生產者和消費者的強 耦合)。
萬一哪天郵遞員換人了,你還要重新認識一下(相當于消費者變化導致修改生產者代碼)。
而郵筒相對來說比較固定,你依賴它的成本就比較低(相當于和緩沖區之間的弱 耦合)。2:支持并發 生產者直接調用消費者的某個方法,還有另一個弊端。由于函數調用是同步的(或者叫阻塞的),在消費者的方法沒有返回之前,生產者只能一直等著 而使用這個模型,生產者把制造出來的數據只需要放在緩沖區即可,不需要等待消費者來取再舉個小栗子 從寄信的例子來看。如果沒有郵筒,你得拿著信傻站在路口等郵遞員過來收(相當于生產者阻塞);
又或者郵遞員得挨家挨戶問,誰要寄信(相當于消費者輪詢)。不管是哪種方法,都挺耗時間的3:支持忙閑不均 緩沖區還有另一個好處。如果制造數據的速度時快時慢,緩沖區的好處就體現出來了。
當數據制造快的時候,消費者來不及處理,未處理的數據可以暫時存在緩沖區中。等生產者的制造速度慢下來,消費者再慢慢處理掉。再舉個小栗子 假設郵遞員一次只能帶走1000封信。萬一某次碰上情人節送賀卡,需要寄出去的信超過1000封,
這時候郵筒這個緩沖區就派上用場了。郵遞員把來不及帶走的信暫存在郵筒中,等下次過來時再拿走。
?
二、生產者消費模型
總結:
---生產者消費者模型程序中兩種角色:①負責生產數據(生產者);②負責處理數據(消費者)
---生產者消費者模型的作用:平衡生產者與消費者之間的速度差。
---實現方式:生產者——>隊列——>消費者
如上篇博客內容關于生產消費模型內容,在生產者生產數據的過程結束后,即使消費者已將數據完全獲取,消費者程序也不能結束,需由主進程或者生產者在結束生產程序后發送給消費者結束口令,消費者程序才會結束。但是如果出現多個消費者和多個生產者,這種情況又該如何解決?方法如下兩種:
1、根據消費者數量傳送結束信號(low)
from multiprocessing import Process,Queue import time,random,osdef procducer(q):for i in range(10):res='包子%s' %itime.sleep(0.5)q.put(res)print('%s 生產了 %s' %(os.getpid(),res))def consumer(q):while True:res=q.get()if res is None:breakprint('%s 吃 %s' %(os.getpid(),res))time.sleep(random.randint(2,3))if __name__ == '__main__':q=Queue()p=Process(target=procducer,args=(q,))c=Process(target=consumer,args=(q,))p.start()c.start()p.join()q.put(None)print('主') from multiprocessing import Process,Queue import time import random import os def producer(name,q):for i in range(10):res='%s%s' %(name,i)time.sleep(random.randint(1, 3))q.put(res)print('%s生產了%s' %(os.getpid(),res)) def consumer(name,q):while True:res=q.get()if not res:breakprint('%s吃了%s' %(name,res)) if __name__=='__main__':q=Queue()p1=Process(target=producer,args=('巧克力',q))p2=Process(target=producer,args=('甜甜圈',q))p3=Process(target=producer, args=('奶油蛋糕',q))c1=Process(target=consumer,args=('alex',q))c2=Process(target=consumer,args=('egon',q))_p=[p1,p2,p3,c1,c2]for p in _p:p.start()p1.join()p2.join()p3.join()'''保證生產程序結束后,再發送結束信號,發送數量和消費者數量一致'''q.put(None)q.put(None) 天啊嚕2、JoinableQueue隊列機制
JoinableQueue與Queue隊列基本相似,但前者隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。Queue實例的對象具有的方法JoinableQueue同樣具有,除此JoinableQueue還具有如下方法:
①q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止
②q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大于從隊列中刪除項目的數量,將引發ValueError異常
from multiprocessing import Process,JoinableQueue import time import random def producer(name,food,q):for i in range(10):res='%s%s' %(food,i)time.sleep(random.randint(1, 3))q.put(res)print('%s生產了%s' %(name,res))q.join() #阻塞生產者進程,保證此進程結束時消費者進程已處理完其產生的數據 def consumer(name,q):while True:res=q.get()if not res:breakprint('%s吃了%s' %(name,res))q.task_done() #向q.join()發送一次信號,證明一個數據已經被取走了 if __name__=='__main__':q=JoinableQueue()p1=Process(target=producer,args=(1,'巧克力',q))p2=Process(target=producer,args=(2,'奶油蛋糕',q))p3 = Process(target=producer, args=(3,'冰糖葫蘆', q))c1=Process(target=consumer,args=('lishi',q))c2=Process(target=consumer,args=('jassin',q))'''守護進程保證主進程結束時,守護進程也立即結束'''c1.daemon=Truec2.daemon=True_p=[p1,p2,p3,c1,c2]for p in _p:p.start()p1.join()p2.join()p3.join()二、回調函數
進程池執行完一個獲得數據的進程,即刻要求通知主進程拿去解析數據。主進程調用一個函數去處理,這個函數便被稱為回調函數,要求進程池進程的結果為回調函數的參數。
爬蟲實例:
線程池
import requests from concurrent.futures import ThreadPoolExecutor(線程池),ProcessPoolExecutor(進程池) from threading import current_thread import time import osdef get(url): # 下載print('%s GET %s' %(current_thread().getName(),url))response=requests.get(url)time.sleep(3)if response.status_code == 200: # 固定,=200表示下載完成return {'url':url,'text':response.text}def parse(obj): # 解析res=obj.result()print('[%s] <%s> (%s)' % (current_thread().getName(), res['url'],len(res['text'])))if __name__ == '__main__':urls = ['https://www.python.org','https://www.baidu.com','https://www.jd.com','https://www.tmall.com',]t=ThreadPoolExecutor(2)for url in urls:t.submit(get,url).add_done_callback(parse) t.shutdown(wait=True)print('主',os.getpid())我們可以把耗時間(阻塞)的任務放到進程池中,然后指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。如果在主進程中等待進程池中所有任務都執行完畢后,再統一處理結果,則無需回調函數。
進程池
import requests from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time import osdef get(url):print('%s GET %s' %(os.getpid(),url))response=requests.get(url)time.sleep(3)if response.status_code == 200:return {'url':url,'text':response.text}def parse(obj):res=obj.result()print('[%s] <%s> (%s)' % (os.getpid(), res['url'],len(res['text'])))if __name__ == '__main__':urls = ['https://www.python.org','https://www.baidu.com','https://www.jd.com','https://www.tmall.com',]t=ProcessPoolExecutor(2)for url in urls:t.submit(get,url).add_done_callback(parse)t.shutdown(wait=True)print('主',os.getpid())?
轉載于:https://www.cnblogs.com/jassin-du/p/7978020.html
總結
以上是生活随笔為你收集整理的并发编程之多进程3 (生产者与消费者模型) 回调函数的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 今日题解------uvalive 26
- 下一篇: 微信小程序出现【需要进行身份验证】弹框解