进程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event) day38
進程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event)
鎖 —— multiprocess.Lock
? ? ? 通過剛剛的學習,我們千方百計實現了程序的異步,讓多個任務可以同時在幾個進程中并發處理,他們之間的運行沒有順序,一旦開啟也不受我們控制。盡管并發編程讓我們能更加充分的利用IO資源,但是也給我們帶來了新的問題。
當多個進程使用同一份數據資源的時候,就會引發數據安全或順序混亂問題。?
#加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。 #雖然可以用文件共享數據實現進程間通信,但問題是: #1.效率低(共享數據基于文件,而文件是硬盤上的數據) #2.需要自己加鎖處理#因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基于消息的IPC通信機制:隊列和管道。 #隊列和管道都是將數據存放于內存中 #隊列又是基于(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來, #我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。信號量 —— multiprocess.Semaphore(了解)
#互斥鎖同時只允許一個線程更改數據,而信號量Semaphore是同時允許一定數量的線程更改數據 。 #假設商場里有4個迷你唱吧,所以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。 #實現: #信號量同步基于內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和#V()的Python實現。信號量同步機制適用于訪問像服務器這樣的有限資源。 #信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念?
# 信號量介紹Semaphore # 多進程中的組件 # ktv # 4個 # 一套資源 同一時間 只能被n個人訪問 # 某一段代碼 同一時間 只能被n個進程執行 import time#引入時間模塊 import random#引入隨機數 from multiprocessing import Process#引入進程模塊 from multiprocessing import Semaphore#引入信號模塊# sem = Semaphore(4)#實例化4個信號 # sem.acquire() # print('拿到第一把鑰匙') # sem.acquire() # print('拿到第二把鑰匙') # sem.acquire() # print('拿到第三把鑰匙') # sem.acquire() # print('拿到第四把鑰匙') # sem.acquire() # print('拿到第五把鑰匙') def ktv(i,sem):sem.acquire() #獲取鑰匙print('%s走進ktv'%i) #進入ktvtime.sleep(random.randint(1,5))#隨機選擇1到5之間的數print('%s走出ktv'%i)#打印走出ktvsem.release() #還鑰匙if __name__ == '__main__' :#如果為真sem = Semaphore(4)#實例化一個紅綠燈for i in range(20):#循環20個數p = Process(target=ktv,args=(i,sem))#開啟一個進程對象p.start()#開啟這個進程?
事件 —— multiprocess.Event(了解)
#python線程的事件用于主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。#事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 #event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。#clear:將“Flag”設置為False #set:將“Flag”設置為True?
?
#紅綠燈示例 # 通過一個信號 來控制 多個進程 同時 執行或者阻塞 # 事件 # from multiprocessing import Event # 一個信號可以使所有的進程都進入阻塞狀態 # 也可以控制所有的進程解除阻塞 # 一個事件被創建之后,默認是阻塞狀態 # e = Event() # 創建了一個事件 # print(e.is_set()) # 查看一個事件的狀態,默認被設置成阻塞 # e.set() # 將這個事件的狀態改為True # print(e.is_set()) # e.wait() # 是依據e.is_set()的值來決定是否阻塞的 # print(123456) # e.clear() # 將這個事件的狀態改為False # print(e.is_set()) # e.wait() # 等待 事件的信號被變成True # print('*'*10)# set 和 clear# 分別用來修改一個事件的狀態 True或者False # is_set 用來查看一個事件的狀態 # wait 是依據事件的狀態來決定自己是否在wait處阻塞# False阻塞 True不阻塞# 紅綠燈事件 import time#引入時間模塊 import random#引入隨機模塊 from multiprocessing import Event,Process#引入進程模塊和時間模塊 def cars(e,i):#定義一個函數if not e.is_set():#如果信號燈為真的時候print('car%i在等待'%i)#打印內容e.wait() # 阻塞 直到得到一個 事件狀態變成 True 的信號print('\033[0;32;40mcar%i通過\033[0m' % i)#打印通過def light(e):#定義一個燈while True:#循環為真if e.is_set():#如果事件狀態為真e.clear()#則清除信號燈print('\033[31m紅燈亮了\033[0m')#打印紅燈亮了else:#否則e.set()#設置狀態為真print('\033[32m綠燈亮了\033[0m')#打印綠燈亮了time.sleep(2)#睡2秒if __name__ == '__main__':#如果為真e = Event()#實例化一個事件traffic = Process(target=light,args=(e,))#定義一個燈的進程traffic.start()#開始進程for i in range(20):#循環20次car = Process(target=cars, args=(e,i))#創建20個汽車進程car.start()#啟動汽車進程time.sleep(random.random())#隨機睡,隨機出現0~1之間的小數?
進程間通信——隊列和管道(multiprocess.Queue、multiprocess.Pipe)
進程間通信
IPC(Inter-Process Communication) #進程間通信
隊列?
概念介紹
創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。?
#Queue([maxsize]) #創建共享的進程隊列。 #參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。 #底層隊列使用管道和鎖定實現。?
#方法介紹 Queue([maxsize]) 創建共享的進程隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。 Queue的實例q具有以下方法:q.get( [ block [ ,timeout ] ] ) 返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用于控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。#q.get_nowait( ) 同q.get(False)方法。#q.put(item [, block [,timeout ] ] ) 將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時后將引發Queue.Full異常。#q.qsize() 返回隊列中目前項目的正確數量。此函數的結果并不可靠,因為在返回結果和在稍后程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。#q.empty() 如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。#q.full() 如果q已滿,返回為True. 由于線程的存在,結果也可能是不可靠的(參考q.empty()方法)。。?
#其他方法 #q.close() 關閉隊列,防止隊列中加入更多數據。調用此方法時,后臺線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,如果某個使用者正被阻塞在#get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。#q.cancel_join_thread() 不會再進程退出時自動連接后臺線程。這可以防止join_thread()方法阻塞。#q.join_thread() 連接隊列的后臺線程。此方法用于在調用q.close()方法后,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()方法可以禁止這種行為。?
代碼實例
''' multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列 都是基于消息傳遞實現的,但是隊列接口 '''from multiprocessing import Queue#引入一個隊列模塊 q=Queue(3)#實例化一個隊列#put ,get ,put_nowait,get_nowait,full,empty q.put(3)#放入隊列中 q.put(3)#放入隊列中 q.put(3)#放入隊列中 # q.put(3) # 如果隊列已經滿了,程序就會停在這里,等待數據被別人取走,再將數據放入隊列。# 如果隊列中的數據一直不被取走,程序就會永遠停在這里。 try:#異常處理q.put_nowait(3) # 可以使用put_nowait,如果隊列滿了不會阻塞,但是會因為隊列滿了而報錯。 except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,但是會丟掉這個消息。print('隊列已經滿了')# 因此,我們再放入數據之前,可以先看一下隊列的狀態,如果已經滿了,就不繼續put了。 print(q.full()) #滿了print(q.get())#取出一個 print(q.get())#取出一個 print(q.get())#取出一個 # print(q.get()) # 同put方法一樣,如果隊列已經空了,那么繼續取就會出現阻塞。 try:#異常處理q.get_nowait(3) # 可以使用get_nowait,如果隊列滿了不會阻塞,但是會因為沒取到值而報錯。 except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。print('隊列已經空了')print(q.empty()) #空了上面這個例子還沒有加入進程通信,只是先來看看隊列為我們提供的方法,以及這些方法的使用和現象。
#子進程發送數據給父進程 import time#引入一個時間模塊 from multiprocessing import Process, Queue#引入一個進程和隊列模塊def f(q):#定義一個函數q.put([time.asctime(), 'from Eva', 'hello']) #調用主函數中p進程傳遞過來的進程參數 put函數為向隊列中添加一條數據。if __name__ == '__main__':#定義一個函數q = Queue() #創建一個Queue對象p = Process(target=f, args=(q,)) #創建一個進程p.start()#開始進程print(q.get())#拿出一個p.join()#感知子進程結束上面是一個queue的簡單應用,使用隊列q對象調用get函數來取得隊列中最先進入的數據。 接下來看一個稍微復雜一些的例子:
#批量生產數據放入隊列再批量獲取結果 import os#引入操作系統模塊 import time#引入時間模塊 import multiprocessing#引入多元進程模塊# 向queue中輸入數據的函數 def inputQ(queue):#定義一個函數info = str(os.getpid()) + '(put):' + str(time.asctime())queue.put(info)向隊列中放入一個信息# 向queue中輸出數據的函數 def outputQ(queue):#取隊列中的數據info = queue.get()#取信息print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))#打印這個內容# Main if __name__ == '__main__':#如果用戶名是當前用戶名multiprocessing.freeze_support()# record1 = [] # store input processesrecord2 = [] # store output processesqueue = multiprocessing.Queue(3)#實例化一個隊列# 輸入進程for i in range(10):#循環10個數process = multiprocessing.Process(target=inputQ,args=(queue,))#創建一個進程process.start()#開始這個進程record1.append(process)#添加到列表中# 輸出進程for i in range(10):#循環10個數process = multiprocessing.Process(target=outputQ,args=(queue,))#創建一個進程process.start()#開始進程record2.append(process)#添加到列表里for p in record1:#循環這個列表p.join()#感知子進程結束for p in record2:#循環這個進程p.join()#感知子進程結束生產者消費者模型
在并發編程中使用生產者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
為什么要使用生產者和消費者模式
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這個問題于是引入了生產者和消費者模式。
什么是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。
基于隊列實現生產者消費者模型
#基于隊列實現生產者消費者模型 # 隊列 # 生產者消費者模型# 生產者 進程 # 消費者 進程 import time#引入時間模塊 import random#引入隨機數 from multiprocessing import Process,Queue#引入進程模塊和隊列模塊 def consumer(q,name):#定義一個消費者函數while True:#循環為真food = q.get()#拿出食物if food is None:#如果食物為空print('%s獲取到了一個空'%name)#打印胡去到一個空break#打斷print('\033[31m%s消費了%s\033[0m' % (name,food))#打印誰消費了什么食物time.sleep(random.randint(1,3))#隨機睡1~3秒def producer(name,food,q):#定義一個生產者函數for i in range(4):#循環4次time.sleep(random.randint(1,3))#隨機睡1~3秒f = '%s生產了%s%s'%(name,food,i)#誰生產了什么食物print(f)#打印內容q.put(f)#把食物放到隊列里if __name__ == '__main__':#如果名稱是當前名稱q = Queue(20)#實例化一個隊列20p1 = Process(target=producer,args=('Egon','包子',q))#創建一個進程p2 = Process(target=producer, args=('wusir','泔水', q))#創建一個進程c1 = Process(target=consumer, args=(q,'alex'))#創建一個進程c2 = Process(target=consumer, args=(q,'jinboss'))#創建一個進程p1.start()#啟動一個進程p2.start()#啟動一個進程c1.start()#啟動一個進程c2.start()#啟動一個進程p1.join()#感知p1進程結p2.join()#感知p2進程結束q.put(None)#往隊列中添加一個Noneq.put(None)#往隊列中添加一個None?
此時的問題是主進程永遠不會結束,原因是:生產者p在生產完后就結束了,但是消費者c在取空了q之后,則一直處于死循環中且卡在q.get()這一步。
解決方式無非是讓生產者在生產完畢后,往隊列中再發一個結束信號,這樣消費者在接收到結束信號后就可以break出死循環。
?
注意:結束信號None,不一定要由生產者發,主進程里同樣可以發,但主進程需要等生產者結束后才應該發送該信號
JoinableQueue([maxsize])?
創建可連接的共享進程隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。?
#JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:#q.task_done() #使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。如果調用此方法的次數大于從隊列中刪除的項目數量,將引發ValueError異常。#q.join() #生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。 #下面的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,并等待它們被處理。 #JoinableQueue隊列實現消費之生產者模型 import time#引入一個時間模塊 import random#引入一個隨機數模塊 from multiprocessing import Process,JoinableQueue#引入進程模塊和隊列模塊 def consumer(q,name):#定義一個消費者函數while True:#循環為真food = q.get()#從隊列中拿出食物print('\033[31m%s消費了%s\033[0m' % (name,food))#打印內容time.sleep(random.randint(1,3))#隨機睡1~3秒q.task_done() # count - 1#def producer(name,food,q):#生產者for i in range(4):#循環4次time.sleep(random.randint(1,3))#隨機睡1~3秒f = '%s生產了%s%s'%(name,food,i)#誰生產了食物print(f)#打印這個內容q.put(f)#放入到隊列里q.join() # 阻塞 直到一個隊列中的所有數據 全部被處理完畢if __name__ == '__main__':#如果文件名為當前名稱q = JoinableQueue(20)#實例化一個隊列對象p1 = Process(target=producer,args=('Egon','包子',q))#創建一個生產者進程p2 = Process(target=producer, args=('wusir','泔水', q))#創建一個生產著進程c1 = Process(target=consumer, args=(q,'alex'))#創建一個消費者c2 = Process(target=consumer, args=(q,'jinboss'))#創建一個消費者進程p1.start()#開啟一個生產者進程p2.start()#開啟一個生產者進程c1.daemon = True # 設置為守護進程 主進程中的代碼執行完畢之后,子進程自動結束c2.daemon = True #設置守護進程c1.start() #開啟一個消費者進程c2.start() #開啟一個消費者進程p1.join() #感知一個生產者進程結束p2.join() # 感知一個進程的結束# 在消費者這一端:# 每次獲取一個數據# 處理一個數據# 發送一個記號 : 標志一個數據被處理成功# 在生產者這一端:# 每一次生產一個數據,# 且每一次生產的數據都放在隊列中# 在隊列中刻上一個記號# 當生產者全部生產完畢之后,# join信號 : 已經停止生產數據了# 且要等待之前被刻上的記號都被消費完# 當數據都被處理完時,join阻塞結束# consumer 中把所有的任務消耗完 # producer 端 的 join感知到,停止阻塞 # 所有的producer進程結束 # 主進程中的p.join結束 # 主進程中代碼結束 # 守護進程(消費者的進程)結束
?
管道(了解)
#創建管道的類: Pipe([duplex]):在進程之間創建一條管道,并返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象,強調一點:必須在產生Process對象之前產生管道 #參數介紹: dumplex:默認管道是全雙工的,如果將duplex射成False,conn1只能用于接收,conn2只能用于發送。 #主要方法: conn1.recv():接收conn2.send(obj)發送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經關閉,那么recv方法會拋出EOFError。conn1.send(obj):通過連接發送對象。obj是與序列化兼容的任意對象#其他方法: conn1.close():關閉連接。如果conn1被垃圾回收,將自動調用此方法 conn1.fileno():返回連接使用的整數文件描述符 conn1.poll([timeout]):如果連接上的數據可用,返回True。timeout指定等待的最長時限。如果省略此參數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待數據到達。conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。如果進入的消息,超過了這個最大值,將引發IOError異常,并且在連接上無法進行進一步讀取。如果連接的另外一端已經關閉,再也不存在任何數據,將引發EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):通過連接發送字節數據緩沖區,buffer是支持緩沖區接口的任意對象,offset是緩沖區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,然后調用c.recv_bytes()函數進行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,并把它保存在buffer對象中,該對象支持可寫入的緩沖區接口(即bytearray對象或類似的對象)。offset指定緩沖區中放置消息處的字節位移。返回值是收到的字節數。如果消息長度大于可用的緩沖區空間,將引發BufferTooShort異常。 from multiprocessing import Process, Pipe#引入進程模塊和管道模塊def f(conn):#定義一個函數conn.send("Hello The_Third_Wave")#發送一條信息conn.close()#關閉這個進程if __name__ == '__main__':#如果名字等于當前名稱parent_conn, child_conn = Pipe()#接收兩個參數p = Process(target=f, args=(child_conn,))#創建一個進程p.start()#啟動進程print(parent_conn.recv())#接收一個信息p.join()#等待進程結束?
應該特別注意管道端點的正確管理問題。如果是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了為何在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。如果忘記執行這些步驟,程序可能在消費者中的recv()操作上掛起。管道是由操作系統進行引用計數的,必須在所有進程中關閉管道后才能生成EOFError異常。因此,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。?
#引發EOFError from multiprocessing import Process, Pipe引入進程模塊和管道模塊def f(parent_conn,child_conn):#定義一個函數傳入兩個參數#parent_conn.close() #不寫close將不會引發EOFErrorwhile True:#循環為真try:#異常處理print(child_conn.recv())#打印接收的值except EOFError:#萬能異常child_conn.close()#關閉連接if __name__ == '__main__':#如果用戶名是當前用戶名parent_conn, child_conn = Pipe()#接受兩個參數p = Process(target=f, args=(parent_conn,child_conn,))#實例化一個進程p.start()#而開始進程child_conn.close()#關閉客戶端連接parent_conn.send('hello')#發送信息parent_conn.close()#冠詞這個信息p.join()#等待進程結束 #pipe實現生產者和消費者 from multiprocessing import Process,Pipe#引入兩個模塊def consumer(p,name):#定義一個消費者produce, consume=p#接收兩個參數produce.close()#關閉生產者while True:#循環為真try:#異常處理baozi=consume.recv()#接收信息print('%s 收到包子:%s' %(name,baozi))#打印內容except EOFError:#異常處理break#打斷def producer(seq,p):#定義一個生產者produce, consume=p#接受兩個參數consume.close()#關閉生產者for i in seq:#循環打印produce.send(i)#發送if __name__ == '__main__':#如果用戶名等于當前用戶名produce,consume=Pipe()#接受兩個信息 c1=Process(target=consumer,args=((produce,consume),'c1'))#創建一個進程c1.start()#開始這個進程 seq=(i for i in range(10))#循環是個數producer(seq,(produce,consume))#運行生產者函數 produce.close()#關閉生產者consume.close()#關閉消費者 c1.join()#等待進程結束print('主進程')?
#多個消費之之間的競爭問題帶來的數據不安全問題 from multiprocessing import Process,Pipe,Lock#引入進程模塊,管道,鎖def consumer(p,name,lock):#定義一個消費者produce, consume=p#傳入兩個參數produce.close()#關閉生產者while True:#循環為真lock.acquire()#拿鑰匙baozi=consume.recv()#接收信息lock.release()#還鑰匙if baozi:#如果有信息print('%s 收到包子:%s' %(name,baozi))#打印接收到包子else:#否則consume.close()#關閉消費者break#打斷def producer(p,n):#定義一個生產者produce, consume=p#接收兩個參數consume.close()#關閉消費者for i in range(n):#循環produce.send(i)#發送iproduce.send(None)#生產者發送一個noneproduce.send(None)#生產者發送一個noneproduce.close()#關閉生產者if __name__ == '__main__':#如果名字等于當前用戶名produce,consume=Pipe()#接收兩個參數lock = Lock()#實例化一個鎖c1=Process(target=consumer,args=((produce,consume),'c1',lock))#創建一個消費者進程c2=Process(target=consumer,args=((produce,consume),'c2',lock))#創建一個消費者進程p1=Process(target=producer,args=((produce,consume),10))#創建一個生產者進程c1.start()#開啟進程c2.start()#開啟進程p1.start()#開啟進程 produce.close()#關閉生產者consume.close()#關閉消費者 c1.join()#等待進程接收c2.join()#等待進程接收p1.join()#等待進程接收print('主進程') #多個消費之之間的競爭問題帶來的數據不安全問題 from multiprocessing import Process,Pipe,Lock#引入進程模塊,管道,鎖def consumer(p,name,lock):#定義一個消費者produce, consume=p#傳入兩個參數produce.close()#關閉生產者while True:#循環為真lock.acquire()#拿鑰匙baozi=consume.recv()#接收信息lock.release()#還鑰匙if baozi:#如果有信息print('%s 收到包子:%s' %(name,baozi))#打印接收到包子else:#否則consume.close()#關閉消費者break#打斷def producer(p,n):#定義一個生產者produce, consume=p#接收兩個參數consume.close()#關閉消費者for i in range(n):#循環produce.send(i)#發送iproduce.send(None)#生產者發送一個noneproduce.send(None)#生產者發送一個noneproduce.close()#關閉生產者if __name__ == '__main__':#如果名字等于當前用戶名produce,consume=Pipe()#接收兩個參數lock = Lock()#實例化一個鎖c1=Process(target=consumer,args=((produce,consume),'c1',lock))#創建一個消費者進程c2=Process(target=consumer,args=((produce,consume),'c2',lock))#創建一個消費者進程p1=Process(target=producer,args=((produce,consume),10))#創建一個生產者進程c1.start()#開啟進程c2.start()#開啟進程p1.start()#開啟進程 produce.close()#關閉生產者consume.close()#關閉消費者 c1.join()#等待進程接收c2.join()#等待進程接收p1.join()#等待進程接收print('主進程')?
進程之間的數據共享
展望未來,基于消息傳遞的并發編程是大勢所趨
即便是使用線程,推薦做法也是將程序設計為大量獨立的線程集合,通過消息隊列交換數據。
這樣極大地減少了對使用鎖定和其他同步手段的需求,還可以擴展到分布式系統中。
但進程間應該盡量避免通信,即便需要通信,也應該選擇進程安全的工具來避免加鎖帶來的問題。
以后我們會嘗試使用數據庫來解決現在進程之間的數據共享問題。
#Manger模塊介紹 #進程間數據是獨立的,可以借助于隊列或管道實現通信,二者都是基于消息傳遞的 #雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止于此#A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.#A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. from multiprocessing import Manager,Process,Lock#引入進程模塊,鎖模塊 def work(d,lock):#定義一個工作方法with lock: #不加鎖而操作共享的數據,肯定會出現數據錯亂d['count']-=1if __name__ == '__main__':#如果用戶名等于當前用戶名lock=Lock()#實例化一個鎖 with Manager() as m:dic=m.dict({'count':100})#傳入一個字典p_l=[]#創建一個空列表for i in range(100):#循環100個數p=Process(target=work,args=(dic,lock))##創建一個進程p_l.append(p)#添加到列表里p.start()#開始進程for p in p_l:#循環列表p.join()#等待進程結束print(dic)#打印這個字典?
進程池和multiprocess.Pool模塊
進程池
為什么要有進程池?進程池的概念。
在程序實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閑時可能只有零星任務。那么在成千上萬個任務需要被執行的時候,我們就需要去創建成千上萬個進程么?首先,創建進程需要消耗時間,銷毀進程也需要消耗時間。第二即便開啟了成千上萬的進程,操作系統也不能讓他們同時執行,這樣反而會影響程序的效率。因此我們不能無限制的根據任務開啟或者結束進程。那么我們要怎么做呢?
在這里,要給大家介紹一個進程池的概念,定義一個池子,在里面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等到處理完畢,進程并不關閉,而是將進程再放回進程池中繼續等待任務。如果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閑進程才能繼續執行。也就是說,池中進程的數量是固定的,那么同一時間最多有固定數量的進程在運行。這樣不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現并發效果。
multiprocess.Pool模塊
概念介紹
#Pool([numprocess [,initializer [, initargs]]]):創建進程池 #1 numprocess:要創建的進程數,如果省略,將默認使用cpu_count()的值 #2 initializer:是每個工作進程啟動時要執行的可調用對象,默認為None #3 initargs:是要傳給initializer的參數組?
#1 p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。 #2 '''需要強調的是:此操作并不會在所有池工作進程中并執行func函數。如果要通過不同參數并發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async()''' #3 #4 p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。 #5 '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。''' #6 #7 p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成 #8 #9 P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之后調用?
#1 方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法 #2 obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠程操作中引發了異常,它將在調用此方法時再次被引發。 #3 obj.ready():如果調用完成,返回True #4 obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常 #5 obj.wait([timeout]):等待結果變為可用。 #6 obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數?
代碼實例
進程池和多進程效率對比
同步和異步
#進程池的同步調用 import os,time#引入系統模塊和時間模塊 from multiprocessing import Pool#引入進程池模塊def work(n):#定義一個函數print('%s run' %os.getpid())#打印idtime.sleep(3)#睡3秒return n**2#返回一個n平方if __name__ == '__main__':#如果文件名等于當前文件名p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務res_l=[]#創建一個列表for i in range(10):#循環十個數res=p.apply(work,args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞# 但不管該任務是否存在阻塞,同步調用都會在原地等著print(res_l)#打印列表?
import os#引入系統模塊 import time#引入時間模塊 import random#引入隨機數模塊 from multiprocessing import Pool#引入進程池模塊def work(n):#定義一個函數print('%s run' %os.getpid())#打印內容time.sleep(random.random())#隨機睡一會return n**2#返回n*nif __name__ == '__main__':#如果文件名等于當前用戶名p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務res_l=[]#得到一個空列表for i in range(10):#循環十個數res=p.apply_async(work,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行# 返回結果之后,將結果放入列表,歸還進程,之后再執行新的任務# 需要注意的是,進程池中的三個進程不會同時開啟或者同時結束# 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。 res_l.append(res)# 異步apply_async用法:如果使用異步提交的任務,主進程需要使用jion,等待進程池內任務都處理完,然后可以用get收集結果# 否則,主進程結束,進程池可能還沒來得及執行,也就跟著一起結束了 p.close()p.join()for res in res_l:print(res.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get?
練習
?server:進程池版socket并發聊天 ?client發現:并發開啟多個客戶端,服務端同一時間只有4個不同的pid,只能結束一個客戶端,另外一個客戶端才會進來.
進程池的其他實現方式:https://docs.python.org/dev/library/concurrent.futures.html
參考資料http://www.cnblogs.com/linhaifeng/articles/6817679.html
https://www.jianshu.com/p/1200fd49b583
https://www.jianshu.com/p/aed6067eeac9
?
轉載于:https://www.cnblogs.com/chongdongxiaoyu/p/8658379.html
總結
以上是生活随笔為你收集整理的进程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event) day38的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Mybatis_接口编程
- 下一篇: 吴恩达机器学习笔记(二) —— Logi