python 管道队列_20.2、python进程间通信——队列和管道
進(jìn)程間通信——隊(duì)列和管道(multiprocess.Queue、multiprocess.Pipe)
進(jìn)程間通信
IPC(Inter-Process Communication)
隊(duì)列
概念介紹
創(chuàng)建共享的進(jìn)程隊(duì)列,Queue是多進(jìn)程安全的隊(duì)列,可以使用Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。
Queue([maxsize])
創(chuàng)建共享的進(jìn)程隊(duì)列。
參數(shù) :maxsize是隊(duì)列中允許的最大項(xiàng)數(shù)。如果省略此參數(shù),則無大小限制。
底層隊(duì)列使用管道和鎖定實(shí)現(xiàn)。
方法介紹
Queue([maxsize])
創(chuàng)建共享的進(jìn)程隊(duì)列。maxsize是隊(duì)列中允許的最大項(xiàng)數(shù)。如果省略此參數(shù),則無大小限制。底層隊(duì)列使用管道和鎖定實(shí)現(xiàn)。另外,還需要運(yùn)行支持線程以便隊(duì)列中的數(shù)據(jù)傳輸?shù)降讓庸艿乐小?/p>
Queue的實(shí)例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一個(gè)項(xiàng)目。如果q為空,此方法將阻塞,直到隊(duì)列中有項(xiàng)目可用為止。block用于控制阻塞行為,默認(rèn)為True. 如果設(shè)置為False,將引發(fā)Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時(shí)時(shí)間,用在阻塞模式中。如果在制定的時(shí)間間隔內(nèi)沒有項(xiàng)目變?yōu)榭捎?#xff0c;將引發(fā)Queue.Empty異常。
q.get_nowait( )
同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
將item放入隊(duì)列。如果隊(duì)列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認(rèn)為True。如果設(shè)置為False,將引發(fā)Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時(shí)間長短。超時(shí)后將引發(fā)Queue.Full異常。
q.qsize()
返回隊(duì)列中目前項(xiàng)目的正確數(shù)量。此函數(shù)的結(jié)果并不可靠,因?yàn)樵诜祷亟Y(jié)果和在稍后程序中使用結(jié)果之間,隊(duì)列中可能添加或刪除了項(xiàng)目。在某些系統(tǒng)上,此方法可能引發(fā)NotImplementedError異常。
q.empty()
如果調(diào)用此方法時(shí) q為空,返回True。如果其他進(jìn)程或線程正在往隊(duì)列中添加項(xiàng)目,結(jié)果是不可靠的。也就是說,在返回和使用結(jié)果之間,隊(duì)列中可能已經(jīng)加入新的項(xiàng)目。
q.full()
如果q已滿,返回為True. 由于線程的存在,結(jié)果也可能是不可靠的(參考q.empty()方法)。。
其他方法(了解)
q.close()
關(guān)閉隊(duì)列,防止隊(duì)列中加入更多數(shù)據(jù)。調(diào)用此方法時(shí),后臺(tái)線程將繼續(xù)寫入那些已入隊(duì)列但尚未寫入的數(shù)據(jù),但將在此方法完成時(shí)馬上關(guān)閉。如果q被垃圾收集,將自動(dòng)調(diào)用此方法。關(guān)閉隊(duì)列不會(huì)在隊(duì)列使用者中生成任何類型的數(shù)據(jù)結(jié)束信號(hào)或異常。例如,如果某個(gè)使用者正被阻塞在get()操作上,關(guān)閉生產(chǎn)者中的隊(duì)列不會(huì)導(dǎo)致get()方法返回錯(cuò)誤。
q.cancel_join_thread()
不會(huì)再進(jìn)程退出時(shí)自動(dòng)連接后臺(tái)線程。這可以防止join_thread()方法阻塞。
q.join_thread()
連接隊(duì)列的后臺(tái)線程。此方法用于在調(diào)用q.close()方法后,等待所有隊(duì)列項(xiàng)被消耗。默認(rèn)情況下,此方法由不是q的原始創(chuàng)建者的所有進(jìn)程調(diào)用。調(diào)用q.cancel_join_thread()方法可以禁止這種行為。
代碼實(shí)例
單看隊(duì)列用法
'''
multiprocessing模塊支持進(jìn)程間通信的兩種主要形式:管道和隊(duì)列
都是基于消息傳遞實(shí)現(xiàn)的,但是隊(duì)列接口
'''from multiprocessing import Queue
q=Queue(3)#put ,get ,put_nowait,get_nowait,full,emptyq.put(3)
q.put(3)
q.put(3)# q.put(3)? # 如果隊(duì)列已經(jīng)滿了,程序就會(huì)停在這里,等待數(shù)據(jù)被別人取走,再將數(shù)據(jù)放入隊(duì)列。? ? ? ? ? # 如果隊(duì)列中的數(shù)據(jù)一直不被取走,程序就會(huì)永遠(yuǎn)停在這里。try:
q.put_nowait(3) # 可以使用put_nowait,如果隊(duì)列滿了不會(huì)阻塞,但是會(huì)因?yàn)殛?duì)列滿了而報(bào)錯(cuò)。except: # 因此我們可以用一個(gè)try語句來處理這個(gè)錯(cuò)誤。這樣程序不會(huì)一直阻塞下去,但是會(huì)丟掉這個(gè)消息。? ? print('隊(duì)列已經(jīng)滿了')# 因此,我們再放入數(shù)據(jù)之前,可以先看一下隊(duì)列的狀態(tài),如果已經(jīng)滿了,就不繼續(xù)put了。print(q.full()) #滿了print(q.get())print(q.get())print(q.get())# print(q.get()) # 同put方法一樣,如果隊(duì)列已經(jīng)空了,那么繼續(xù)取就會(huì)出現(xiàn)阻塞。try:
q.get_nowait(3) # 可以使用get_nowait,如果隊(duì)列滿了不會(huì)阻塞,但是會(huì)因?yàn)闆]取到值而報(bào)錯(cuò)。except: # 因此我們可以用一個(gè)try語句來處理這個(gè)錯(cuò)誤。這樣程序不會(huì)一直阻塞下去。? ? print('隊(duì)列已經(jīng)空了')print(q.empty()) #空了
上面這個(gè)例子還沒有加入進(jìn)程通信,只是先來看看隊(duì)列為我們提供的方法,以及這些方法的使用和現(xiàn)象。
import timefrom multiprocessing import Process, Queuedef f(q):
q.put([time.asctime(), 'from Eva', 'hello'])? #調(diào)用主函數(shù)中p進(jìn)程傳遞過來的進(jìn)程參數(shù) put函數(shù)為向隊(duì)列中添加一條數(shù)據(jù)。if __name__ == '__main__':
q = Queue() #創(chuàng)建一個(gè)Queue對(duì)象? ? p = Process(target=f, args=(q,)) #創(chuàng)建一個(gè)進(jìn)程? ? p.start()
print(q.get())
p.join()
批量生產(chǎn)數(shù)據(jù)放入隊(duì)列再批量獲取結(jié)果 x
上面是一個(gè)queue的簡單應(yīng)用,使用隊(duì)列q對(duì)象調(diào)用get函數(shù)來取得隊(duì)列中最先進(jìn)入的數(shù)據(jù)。 接下來看一個(gè)稍微復(fù)雜一些的例子:
import osimport timeimport multiprocessing# 向queue中輸入數(shù)據(jù)的函數(shù)def inputQ(queue):
info = str(os.getpid()) + '(put):' + str(time.asctime())
queue.put(info)# 向queue中輸出數(shù)據(jù)的函數(shù)def outputQ(queue):
info = queue.get()
print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))# Mainif __name__ == '__main__':
multiprocessing.freeze_support()
record1 = []? # store input processes? ? record2 = []? # store output processes? ? queue = multiprocessing.Queue(3)
# 輸入進(jìn)程? ? for i in range(10):
process = multiprocessing.Process(target=inputQ,args=(queue,))
process.start()
record1.append(process)
# 輸出進(jìn)程? ? for i in range(10):
process = multiprocessing.Process(target=outputQ,args=(queue,))
process.start()
record2.append(process)
for p in record1:
p.join()
for p in record2:
p.join()
生產(chǎn)者消費(fèi)者模型
在并發(fā)編程中使用生產(chǎn)者和消費(fèi)者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費(fèi)線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。
為什么要使用生產(chǎn)者和消費(fèi)者模式
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個(gè)問題于是引入了生產(chǎn)者和消費(fèi)者模式。
什么是生產(chǎn)者消費(fèi)者模式
生產(chǎn)者消費(fèi)者模式是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊(duì)列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。
基于隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
from multiprocessing import Process,Queueimport time,random,osdef consumer(q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):
for i in range(10):
time.sleep(random.randint(1,3))
res='包子%s' %i
q.put(res)
print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':
q=Queue()
#生產(chǎn)者們:即廚師們? ? p1=Process(target=producer,args=(q,))
#消費(fèi)者們:即吃貨們? ? c1=Process(target=consumer,args=(q,))
#開始? ? p1.start()
c1.start()
print('主')
此時(shí)的問題是主進(jìn)程永遠(yuǎn)不會(huì)結(jié)束,原因是:生產(chǎn)者p在生產(chǎn)完后就結(jié)束了,但是消費(fèi)者c在取空了q之后,則一直處于死循環(huán)中且卡在q.get()這一步。
解決方式無非是讓生產(chǎn)者在生產(chǎn)完畢后,往隊(duì)列中再發(fā)一個(gè)結(jié)束信號(hào),這樣消費(fèi)者在接收到結(jié)束信號(hào)后就可以break出死循環(huán)。
改良版——生產(chǎn)者消費(fèi)者模型
from multiprocessing import Process,Queueimport time,random,osdef consumer(q):
while True:
res=q.get()
if res is None:break #收到結(jié)束信號(hào)則結(jié)束? ? ? ? time.sleep(random.randint(1,3))
print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):
for i in range(10):
time.sleep(random.randint(1,3))
res='包子%s' %i
q.put(res)
print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))
q.put(None) #發(fā)送結(jié)束信號(hào)if __name__ == '__main__':
q=Queue()
#生產(chǎn)者們:即廚師們? ? p1=Process(target=producer,args=(q,))
#消費(fèi)者們:即吃貨們? ? c1=Process(target=consumer,args=(q,))
#開始? ? p1.start()
c1.start()
print('主')
注意:結(jié)束信號(hào)None,不一定要由生產(chǎn)者發(fā),主進(jìn)程里同樣可以發(fā),但主進(jìn)程需要等生產(chǎn)者結(jié)束后才應(yīng)該發(fā)送該信號(hào)
主進(jìn)程在生產(chǎn)者生產(chǎn)完畢后發(fā)送結(jié)束信號(hào)None
from multiprocessing import Process,Queueimport time,random,osdef consumer(q):
while True:
res=q.get()
if res is None:break #收到結(jié)束信號(hào)則結(jié)束? ? ? ? time.sleep(random.randint(1,3))
print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):
for i in range(2):
time.sleep(random.randint(1,3))
res='包子%s' %i
q.put(res)
print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':
q=Queue()
#生產(chǎn)者們:即廚師們? ? p1=Process(target=producer,args=(q,))
#消費(fèi)者們:即吃貨們? ? c1=Process(target=consumer,args=(q,))
#開始? ? p1.start()
c1.start()
p1.join()
q.put(None) #發(fā)送結(jié)束信號(hào)? ? print('主')
但上述解決方式,在有多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者時(shí),我們則需要用一個(gè)很low的方式去解決
多個(gè)消費(fèi)者的例子:有幾個(gè)消費(fèi)者就需要發(fā)送幾次結(jié)束信號(hào)
from multiprocessing import Process,Queueimport time,random,osdef consumer(q):
while True:
res=q.get()
if res is None:break #收到結(jié)束信號(hào)則結(jié)束? ? ? ? time.sleep(random.randint(1,3))
print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(name,q):
for i in range(2):
time.sleep(random.randint(1,3))
res='%s%s' %(name,i)
q.put(res)
print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':
q=Queue()
#生產(chǎn)者們:即廚師們? ? p1=Process(target=producer,args=('包子',q))
p2=Process(target=producer,args=('骨頭',q))
p3=Process(target=producer,args=('泔水',q))
#消費(fèi)者們:即吃貨們? ? c1=Process(target=consumer,args=(q,))
c2=Process(target=consumer,args=(q,))
#開始? ? p1.start()
p2.start()
p3.start()
c1.start()
p1.join() #必須保證生產(chǎn)者全部生產(chǎn)完畢,才應(yīng)該發(fā)送結(jié)束信號(hào)? ? p2.join()
p3.join()
q.put(None) #有幾個(gè)消費(fèi)者就應(yīng)該發(fā)送幾次結(jié)束信號(hào)None? ? q.put(None) #發(fā)送結(jié)束信號(hào)? ? print('主')
JoinableQueue([maxsize])
創(chuàng)建可連接的共享進(jìn)程隊(duì)列。這就像是一個(gè)Queue對(duì)象,但隊(duì)列允許項(xiàng)目的使用者通知生產(chǎn)者項(xiàng)目已經(jīng)被成功處理。通知進(jìn)程是使用共享的信號(hào)和條件變量來實(shí)現(xiàn)的。
JoinableQueue的實(shí)例p除了與Queue對(duì)象相同的方法之外,還具有以下方法:
q.task_done()
使用者使用此方法發(fā)出信號(hào),表示q.get()返回的項(xiàng)目已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊(duì)列中刪除的項(xiàng)目數(shù)量,將引發(fā)ValueError異常。
q.join()
生產(chǎn)者將使用此方法進(jìn)行阻塞,直到隊(duì)列中所有項(xiàng)目均被處理。阻塞將持續(xù)到為隊(duì)列中的每個(gè)項(xiàng)目均調(diào)用q.task_done()方法為止。
下面的例子說明如何建立永遠(yuǎn)運(yùn)行的進(jìn)程,使用和處理隊(duì)列上的項(xiàng)目。生產(chǎn)者將項(xiàng)目放入隊(duì)列,并等待它們被處理。
方法介紹
from multiprocessing import Process,JoinableQueueimport time,random,osdef consumer(q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
q.task_done() #向q.join()發(fā)送一次信號(hào),證明一個(gè)數(shù)據(jù)已經(jīng)被取走了def producer(name,q):
for i in range(10):
time.sleep(random.randint(1,3))
res='%s%s' %(name,i)
q.put(res)
print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))
q.join() #生產(chǎn)完畢,使用此方法進(jìn)行阻塞,直到隊(duì)列中所有項(xiàng)目均被處理。if __name__ == '__main__':
q=JoinableQueue()
#生產(chǎn)者們:即廚師們? ? p1=Process(target=producer,args=('包子',q))
p2=Process(target=producer,args=('骨頭',q))
p3=Process(target=producer,args=('泔水',q))
#消費(fèi)者們:即吃貨們? ? c1=Process(target=consumer,args=(q,))
c2=Process(target=consumer,args=(q,))
c1.daemon=True
c2.daemon=True
#開始? ? p_l=[p1,p2,p3,c1,c2]
for p in p_l:
p.start()
p1.join()
p2.join()
p3.join()
print('主')
#主進(jìn)程等--->p1,p2,p3等---->c1,c2? ? #p1,p2,p3結(jié)束了,證明c1,c2肯定全都收完了p1,p2,p3發(fā)到隊(duì)列的數(shù)據(jù)? ? #因而c1,c2也沒有存在的價(jià)值了,不需要繼續(xù)阻塞在進(jìn)程中影響主進(jìn)程了。應(yīng)該隨著主進(jìn)程的結(jié)束而結(jié)束,所以設(shè)置成守護(hù)進(jìn)程就可以了。
管道(了解)
介紹
#創(chuàng)建管道的類:Pipe([duplex]):在進(jìn)程之間創(chuàng)建一條管道,并返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對(duì)象,強(qiáng)調(diào)一點(diǎn):必須在產(chǎn)生Process對(duì)象之前產(chǎn)生管道#參數(shù)介紹:dumplex:默認(rèn)管道是全雙工的,如果將duplex射成False,conn1只能用于接收,conn2只能用于發(fā)送。#主要方法:? ? conn1.recv():接收conn2.send(obj)發(fā)送的對(duì)象。如果沒有消息可接收,recv方法會(huì)一直阻塞。如果連接的另外一端已經(jīng)關(guān)閉,那么recv方法會(huì)拋出EOFError。
conn1.send(obj):通過連接發(fā)送對(duì)象。obj是與序列化兼容的任意對(duì)象
#其他方法:conn1.close():關(guān)閉連接。如果conn1被垃圾回收,將自動(dòng)調(diào)用此方法
conn1.fileno():返回連接使用的整數(shù)文件描述符
conn1.poll([timeout]):如果連接上的數(shù)據(jù)可用,返回True。timeout指定等待的最長時(shí)限。如果省略此參數(shù),方法將立即返回結(jié)果。如果將timeout射成None,操作將無限期地等待數(shù)據(jù)到達(dá)。
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發(fā)送的一條完整的字節(jié)消息。maxlength指定要接收的最大字節(jié)數(shù)。如果進(jìn)入的消息,超過了這個(gè)最大值,將引發(fā)IOError異常,并且在連接上無法進(jìn)行進(jìn)一步讀取。如果連接的另外一端已經(jīng)關(guān)閉,再也不存在任何數(shù)據(jù),將引發(fā)EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):通過連接發(fā)送字節(jié)數(shù)據(jù)緩沖區(qū),buffer是支持緩沖區(qū)接口的任意對(duì)象,offset是緩沖區(qū)中的字節(jié)偏移量,而size是要發(fā)送字節(jié)數(shù)。結(jié)果數(shù)據(jù)以單條消息的形式發(fā)出,然后調(diào)用c.recv_bytes()函數(shù)進(jìn)行接收
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節(jié)消息,并把它保存在buffer對(duì)象中,該對(duì)象支持可寫入的緩沖區(qū)接口(即bytearray對(duì)象或類似的對(duì)象)。offset指定緩沖區(qū)中放置消息處的字節(jié)位移。返回值是收到的字節(jié)數(shù)。如果消息長度大于可用的緩沖區(qū)空間,將引發(fā)BufferTooShort異常。
pipe初使用
from multiprocessing import Process, Pipedef f(conn):
conn.send("Hello The_Third_Wave")
conn.close()if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()
應(yīng)該特別注意管道端點(diǎn)的正確管理問題。如果是生產(chǎn)者或消費(fèi)者中都沒有使用管道的某個(gè)端點(diǎn),就應(yīng)將它關(guān)閉。這也說明了為何在生產(chǎn)者中關(guān)閉了管道的輸出端,在消費(fèi)者中關(guān)閉管道的輸入端。如果忘記執(zhí)行這些步驟,程序可能在消費(fèi)者中的recv()操作上掛起。管道是由操作系統(tǒng)進(jìn)行引用計(jì)數(shù)的,必須在所有進(jìn)程中關(guān)閉管道后才能生成EOFError異常。因此,在生產(chǎn)者中關(guān)閉管道不會(huì)有任何效果,除非消費(fèi)者也關(guān)閉了相同的管道端點(diǎn)。
pipe初使用
from multiprocessing import Process, Pipedef f(parent_conn,child_conn):
#parent_conn.close() #不寫close將不會(huì)引發(fā)EOFError? ? while True:
try:
print(child_conn.recv())
except EOFError:
child_conn.close()if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(parent_conn,child_conn,))
p.start()
child_conn.close()
parent_conn.send('hello')
parent_conn.close()
p.join()
pipe實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
from multiprocessing import Process,Pipedef consumer(p,name):
produce, consume=p
produce.close()
while True:
try:
baozi=consume.recv()
print('%s 收到包子:%s' %(name,baozi))
except EOFError:
breakdef producer(seq,p):
produce, consume=p
consume.close()
for i in seq:
produce.send(i)if __name__ == '__main__':
produce,consume=Pipe()
c1=Process(target=consumer,args=((produce,consume),'c1'))
c1.start()
seq=(i for i in range(10))
producer(seq,(produce,consume))
produce.close()
consume.close()
c1.join()
print('主進(jìn)程')
多個(gè)消費(fèi)之之間的競爭問題帶來的數(shù)據(jù)不安全問題
from multiprocessing import Process,Pipe,Lockdef consumer(p,name,lock):
produce, consume=p
produce.close()
while True:
lock.acquire()
baozi=consume.recv()
lock.release()
if baozi:
print('%s 收到包子:%s' %(name,baozi))
else:
consume.close()
breakdef producer(p,n):
produce, consume=p
consume.close()
for i in range(n):
produce.send(i)
produce.send(None)
produce.send(None)
produce.close()if __name__ == '__main__':
produce,consume=Pipe()
lock = Lock()
c1=Process(target=consumer,args=((produce,consume),'c1',lock))
c2=Process(target=consumer,args=((produce,consume),'c2',lock))
p1=Process(target=producer,args=((produce,consume),10))
c1.start()
c2.start()
p1.start()
produce.close()
consume.close()
c1.join()
c2.join()
p1.join()
print('主進(jìn)程')
進(jìn)程之間的數(shù)據(jù)共享
展望未來,基于消息傳遞的并發(fā)編程是大勢所趨
即便是使用線程,推薦做法也是將程序設(shè)計(jì)為大量獨(dú)立的線程集合,通過消息隊(duì)列交換數(shù)據(jù)。
這樣極大地減少了對(duì)使用鎖定和其他同步手段的需求,還可以擴(kuò)展到分布式系統(tǒng)中。
但進(jìn)程間應(yīng)該盡量避免通信,即便需要通信,也應(yīng)該選擇進(jìn)程安全的工具來避免加鎖帶來的問題。
以后我們會(huì)嘗試使用數(shù)據(jù)庫來解決現(xiàn)在進(jìn)程之間的數(shù)據(jù)共享問題。
Manager模塊介紹
進(jìn)程間數(shù)據(jù)是獨(dú)立的,可以借助于隊(duì)列或管道實(shí)現(xiàn)通信,二者都是基于消息傳遞的
雖然進(jìn)程間數(shù)據(jù)獨(dú)立,但可以通過Manager實(shí)現(xiàn)數(shù)據(jù)共享,事實(shí)上Manager的功能遠(yuǎn)不止于此
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
Manager例子
from multiprocessing import Manager,Process,Lockdef work(d,lock):
with lock: #不加鎖而操作共享的數(shù)據(jù),肯定會(huì)出現(xiàn)數(shù)據(jù)錯(cuò)亂? ? ? ? d['count']-=1if __name__ == '__main__':
lock=Lock()
with Manager() as m:
dic=m.dict({'count':100})
p_l=[]
for i in range(100):
p=Process(target=work,args=(dic,lock))
p_l.append(p)
p.start()
for p in p_l:
p.join()
print(dic)
進(jìn)程池和multiprocess.Pool模塊
進(jìn)程池
為什么要有進(jìn)程池?進(jìn)程池的概念。
在程序?qū)嶋H處理問題過程中,忙時(shí)會(huì)有成千上萬的任務(wù)需要被執(zhí)行,閑時(shí)可能只有零星任務(wù)。那么在成千上萬個(gè)任務(wù)需要被執(zhí)行的時(shí)候,我們就需要去創(chuàng)建成千上萬個(gè)進(jìn)程么?首先,創(chuàng)建進(jìn)程需要消耗時(shí)間,銷毀進(jìn)程也需要消耗時(shí)間。第二即便開啟了成千上萬的進(jìn)程,操作系統(tǒng)也不能讓他們同時(shí)執(zhí)行,這樣反而會(huì)影響程序的效率。因此我們不能無限制的根據(jù)任務(wù)開啟或者結(jié)束進(jìn)程。那么我們要怎么做呢?
在這里,要給大家介紹一個(gè)進(jìn)程池的概念,定義一個(gè)池子,在里面放上固定數(shù)量的進(jìn)程,有需求來了,就拿一個(gè)池中的進(jìn)程來處理任務(wù),等到處理完畢,進(jìn)程并不關(guān)閉,而是將進(jìn)程再放回進(jìn)程池中繼續(xù)等待任務(wù)。如果有很多任務(wù)需要執(zhí)行,池中的進(jìn)程數(shù)量不夠,任務(wù)就要等待之前的進(jìn)程執(zhí)行任務(wù)完畢歸來,拿到空閑進(jìn)程才能繼續(xù)執(zhí)行。也就是說,池中進(jìn)程的數(shù)量是固定的,那么同一時(shí)間最多有固定數(shù)量的進(jìn)程在運(yùn)行。這樣不會(huì)增加操作系統(tǒng)的調(diào)度難度,還節(jié)省了開閉進(jìn)程的時(shí)間,也一定程度上能夠?qū)崿F(xiàn)并發(fā)效果。
multiprocess.Pool模塊
概念介紹
Pool([numprocess? [,initializer [, initargs]]]):創(chuàng)建進(jìn)程池
參數(shù)介紹
1 numprocess:要?jiǎng)?chuàng)建的進(jìn)程數(shù),如果省略,將默認(rèn)使用cpu_count()的值2 initializer:是每個(gè)工作進(jìn)程啟動(dòng)時(shí)要執(zhí)行的可調(diào)用對(duì)象,默認(rèn)為None3 initargs:是要傳給initializer的參數(shù)組
主要方法
1 p.apply(func [, args [, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。2 '''需要強(qiáng)調(diào)的是:此操作并不會(huì)在所有池工作進(jìn)程中并執(zhí)行func函數(shù)。如果要通過不同參數(shù)并發(fā)地執(zhí)行func函數(shù),必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()'''3 4 p.apply_async(func [, args [, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。5 '''此方法的結(jié)果是AsyncResult類的實(shí)例,callback是可調(diào)用對(duì)象,接收輸入?yún)?shù)。當(dāng)func的結(jié)果變?yōu)榭捎脮r(shí),將理解傳遞給callback。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結(jié)果。'''6? ? 7 p.close():關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成8 9 P.jion():等待所有工作進(jìn)程退出。此方法只能在close()或teminate()之后調(diào)用
其他方法(了解)
1 方法apply_async()和map_async()的返回值是AsyncResul的實(shí)例obj。實(shí)例具有以下方法2 obj.get():返回結(jié)果,如果有必要?jiǎng)t等待結(jié)果到達(dá)。timeout是可選的。如果在指定時(shí)間內(nèi)還沒有到達(dá),將引發(fā)一場。如果遠(yuǎn)程操作中引發(fā)了異常,它將在調(diào)用此方法時(shí)再次被引發(fā)。3 obj.ready():如果調(diào)用完成,返回True4 obj.successful():如果調(diào)用完成且沒有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常5 obj.wait([timeout]):等待結(jié)果變?yōu)榭捎谩? obj.terminate():立即終止所有工作進(jìn)程,同時(shí)不執(zhí)行任何清理或結(jié)束任何掛起工作。如果p被垃圾回收,將自動(dòng)調(diào)用此函數(shù)
代碼實(shí)例
同步和異步
進(jìn)程池的同步調(diào)用
import os,timefrom multiprocessing import Pooldef work(n):
print('%s run' %os.getpid())
time.sleep(3)
return n**2if __name__ == '__main__':
p=Pool(3) #進(jìn)程池中從無到有創(chuàng)建三個(gè)進(jìn)程,以后一直是這三個(gè)進(jìn)程在執(zhí)行任務(wù)? ? res_l=[]
for i in range(10):
res=p.apply(work,args=(i,)) # 同步調(diào)用,直到本次任務(wù)執(zhí)行完畢拿到res,等待任務(wù)work執(zhí)行的過程中可能有阻塞也可能沒有阻塞? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # 但不管該任務(wù)是否存在阻塞,同步調(diào)用都會(huì)在原地等著? ? print(res_l)
進(jìn)程池的異步調(diào)用
import osimport timeimport randomfrom multiprocessing import Pooldef work(n):
print('%s run' %os.getpid())
time.sleep(random.random())
return n**2if __name__ == '__main__':
p=Pool(3) #進(jìn)程池中從無到有創(chuàng)建三個(gè)進(jìn)程,以后一直是這三個(gè)進(jìn)程在執(zhí)行任務(wù)? ? res_l=[]
for i in range(10):
res=p.apply_async(work,args=(i,)) # 異步運(yùn)行,根據(jù)進(jìn)程池中有的進(jìn)程數(shù),每次最多3個(gè)子進(jìn)程在異步執(zhí)行? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # 返回結(jié)果之后,將結(jié)果放入列表,歸還進(jìn)程,之后再執(zhí)行新的任務(wù)? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # 需要注意的是,進(jìn)程池中的三個(gè)進(jìn)程不會(huì)同時(shí)開啟或者同時(shí)結(jié)束? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # 而是執(zhí)行完一個(gè)就釋放一個(gè)進(jìn)程,這個(gè)進(jìn)程就去接收新的任務(wù)。? ? ? ? ? res_l.append(res)
# 異步apply_async用法:如果使用異步提交的任務(wù),主進(jìn)程需要使用jion,等待進(jìn)程池內(nèi)任務(wù)都處理完,然后可以用get收集結(jié)果? ? # 否則,主進(jìn)程結(jié)束,進(jìn)程池可能還沒來得及執(zhí)行,也就跟著一起結(jié)束了? ? p.close()
p.join()
for res in res_l:
print(res.get()) #使用get來獲取apply_aync的結(jié)果,如果是apply,則沒有g(shù)et方法,因?yàn)閍pply是同步執(zhí)行,立刻獲取結(jié)果,也根本無需get
server:進(jìn)程池版socket并發(fā)聊天
#Pool內(nèi)的進(jìn)程數(shù)默認(rèn)是cpu核數(shù),假設(shè)為4(查看方法os.cpu_count())
#開啟6個(gè)客戶端,會(huì)發(fā)現(xiàn)2個(gè)客戶端處于等待狀態(tài)
#在每個(gè)進(jìn)程內(nèi)查看pid,會(huì)發(fā)現(xiàn)pid使用為4個(gè),即多個(gè)客戶端公用4個(gè)進(jìn)程from socket import *from multiprocessing import Poolimport os
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)def talk(conn):
print('進(jìn)程pid: %s' %os.getpid())
while True:
try:
msg=conn.recv(1024)
if not msg:break? ? ? ? ? ? conn.send(msg.upper())
except Exception:
breakif __name__ == '__main__':
p=Pool(4)
while True:
conn,*_=server.accept()
p.apply_async(talk,args=(conn,))
# p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時(shí)間只有一個(gè)客戶端能訪問
client
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))while True:
msg=input('>>: ').strip()
if not msg:continue? ? client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))
發(fā)現(xiàn):并發(fā)開啟多個(gè)客戶端,服務(wù)端同一時(shí)間只有4個(gè)不同的pid,只能結(jié)束一個(gè)客戶端,另外一個(gè)客戶端才會(huì)進(jìn)來.
回調(diào)函數(shù)
需要回調(diào)函數(shù)的場景:進(jìn)程池中任何一個(gè)任務(wù)一旦處理完了,就立即告知主進(jìn)程:我好了額,你可以處理我的結(jié)果了。主進(jìn)程則調(diào)用一個(gè)函數(shù)去處理該結(jié)果,該函數(shù)即回調(diào)函數(shù)
我們可以把耗時(shí)間(阻塞)的任務(wù)放到進(jìn)程池中,然后指定回調(diào)函數(shù)(主進(jìn)程負(fù)責(zé)執(zhí)行),這樣主進(jìn)程在執(zhí)行回調(diào)函數(shù)時(shí)就省去了I/O的過程,直接拿到的是任務(wù)的結(jié)果。
使用多進(jìn)程請求多個(gè)url來減少網(wǎng)絡(luò)等待浪費(fèi)的時(shí)間
from multiprocessing import Poolimport requestsimport jsonimport osdef get_page(url):
print(' get %s' %(os.getpid(),url))
respone=requests.get(url)
if respone.status_code == 200:
return {'url':url,'text':respone.text}def pasrse_page(res):
print(' parse %s' %(os.getpid(),res['url']))
parse_res='url: size:[%s]\n' %(res['url'],len(res['text']))
with open('db.txt','a') as f:
f.write(parse_res)if __name__ == '__main__':
urls=[
'https://www.baidu.com',
'https://www.python.org',
'https://www.openstack.org',
'https://help.github.com/',
'http://www.sina.com.cn/'? ? ]
p=Pool(3)
res_l=[]
for url in urls:
res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
res_l.append(res)
p.close()
p.join()
print([res.get() for res in res_l]) #拿到的是get_page的結(jié)果,其實(shí)完全沒必要拿該結(jié)果,該結(jié)果已經(jīng)傳給回調(diào)函數(shù)處理了'''
打印結(jié)果:
get https://www.baidu.com
get https://www.python.org
get https://www.openstack.org
get https://help.github.com/
parse https://www.baidu.com
get http://www.sina.com.cn/
parse https://www.python.org
parse https://help.github.com/
parse http://www.sina.com.cn/
parse https://www.openstack.org
[{'url': 'https://www.baidu.com', 'text': '\r\n...',...}]
'''
爬蟲實(shí)例
import refrom urllib.request import urlopenfrom multiprocessing import Pooldef get_page(url,pattern):
response=urlopen(url).read().decode('utf-8')
return pattern,responsedef parse_page(info):
pattern,page_content=info
res=re.findall(pattern,page_content)
for item in res:
dic={
'index':item[0].strip(),
'title':item[1].strip(),
'actor':item[2].strip(),
'time':item[3].strip(),
}
print(dic)if __name__ == '__main__':
regex = r'
.*?<.>(\d+).*?title="(.*?)".*?class="movie-item-info".*?(.*?)
.*?(.*?)
'? ? pattern1=re.compile(regex,re.S)url_dic={
'http://maoyan.com/board/7':pattern1,
}
p=Pool()
res_l=[]
for url,pattern in url_dic.items():
res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
res_l.append(res)
for i in res_l:
i.get()
如果在主進(jìn)程中等待進(jìn)程池中所有任務(wù)都執(zhí)行完畢后,再統(tǒng)一處理結(jié)果,則無需回調(diào)函數(shù)
無需回調(diào)函數(shù)
from multiprocessing import Poolimport time,random,osdef work(n):
time.sleep(1)
return n**2if __name__ == '__main__':
p=Pool()
res_l=[]
for i in range(10):
res=p.apply_async(work,args=(i,))
res_l.append(res)
p.close()
p.join() #等待進(jìn)程池中所有進(jìn)程執(zhí)行完畢? ? nums=[]
for res in res_l:
nums.append(res.get()) #拿到所有結(jié)果? ? print(nums) #主進(jìn)程拿到所有的處理結(jié)果,可以在主進(jìn)程中進(jìn)行統(tǒng)一進(jìn)行處理
總結(jié)
以上是生活随笔為你收集整理的python 管道队列_20.2、python进程间通信——队列和管道的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [译]JavaScript 究竟是如何工
- 下一篇: 广东工业大学专项设计_2020年广东工业