日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python 多进程并发_python并发编程之多进程

發(fā)布時間:2023/12/20 python 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python 多进程并发_python并发编程之多进程 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一 multiprocessing模塊介紹

python中的多線程無法利用多核優(yōu)勢,如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多進(jìn)程。Python提供了multiprocessing。

multiprocessing模塊用來開啟子進(jìn)程,并在子進(jìn)程中執(zhí)行我們定制的任務(wù)(比如函數(shù)),該模塊與多線程模塊threading的編程接口類似。

multiprocessing模塊的功能眾多:支持子進(jìn)程、通信和共享數(shù)據(jù)、執(zhí)行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

需要再次強(qiáng)調(diào)的一點是:與線程不同,進(jìn)程沒有任何共享狀態(tài),進(jìn)程修改的數(shù)據(jù),改動僅限于該進(jìn)程內(nèi)。

二 Process類的介紹

創(chuàng)建進(jìn)程的類:

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進(jìn)程中的任務(wù)(尚未啟動)

強(qiáng)調(diào):

1. 需要使用關(guān)鍵字的方式來指定參數(shù)

2. args指定的為傳給target函數(shù)的位置參數(shù),是一個元組形式,必須有逗號

參數(shù)介紹:

1 group參數(shù)未使用,值始終為None

2

3 target表示調(diào)用對象,即子進(jìn)程要執(zhí)行的任務(wù)

4

5 args表示調(diào)用對象的位置參數(shù)元組,args=(1,2,'egon',)

6

7 kwargs表示調(diào)用對象的字典,kwargs={'name':'egon','age':18}

8

9 name為子進(jìn)程的名稱

方法介紹:

1 p.start():啟動進(jìn)程,并調(diào)用該子進(jìn)程中的p.run()

2 p.run():進(jìn)程啟動時運行的方法,正是它去調(diào)用target指定的函數(shù),我們自定義類的類中一定要實現(xiàn)該方法

3

4 p.terminate():強(qiáng)制終止進(jìn)程p,不會進(jìn)行任何清理操作,如果p創(chuàng)建了子進(jìn)程,該子進(jìn)程就成了僵尸進(jìn)程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進(jìn)而導(dǎo)致死鎖

5 p.is_alive():如果p仍然運行,返回True

6

7 p.join([timeout]):主線程等待p終止(強(qiáng)調(diào):是主線程處于等的狀態(tài),而p是處于運行的狀態(tài))。timeout是可選的超時時間,需要強(qiáng)調(diào)的是,p.join只能join住start開啟的進(jìn)程,而不能join住run開啟的進(jìn)程

屬性介紹:

1 p.daemon:默認(rèn)值為False,如果設(shè)為True,代表p為后臺運行的守護(hù)進(jìn)程,當(dāng)p的父進(jìn)程終止時,p也隨之終止,并且設(shè)定為True后,p不能創(chuàng)建自己的新進(jìn)程,必須在p.start()之前設(shè)置2

3 p.name:進(jìn)程的名稱4

5 p.pid:進(jìn)程的pid6

7 p.exitcode:進(jìn)程在運行時為None、如果為–N,表示被信號N結(jié)束(了解即可)8

9 p.authkey:進(jìn)程的身份驗證鍵,默認(rèn)是由os.urandom()隨機(jī)生成的32字符的字符串。這個鍵的用途是為涉及網(wǎng)絡(luò)連接的底層進(jìn)程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)

三 Process類的使用

注意:在windows中Process()必須放到# if __name__ == '__main__':下

詳細(xì)解釋

創(chuàng)建并開啟子進(jìn)程的兩種方式

方法一

方法二

進(jìn)程直接的內(nèi)存空間是隔離的

View Code

練習(xí)1:把上周所學(xué)的socket通信變成并發(fā)的形式

server端

多個client端

這么實現(xiàn)有沒有問題???

Process對象的join方法

join:主進(jìn)程等,等待子進(jìn)程結(jié)束

有了join,程序不就是串行了嗎???

Process對象的其他方法或?qū)傩?了解)

terminate與is_alive

name與pid

僵尸進(jìn)程與孤兒進(jìn)程(了解)

View Code

四 守護(hù)進(jìn)程

主進(jìn)程創(chuàng)建守護(hù)進(jìn)程

其一:守護(hù)進(jìn)程會在主進(jìn)程代碼執(zhí)行結(jié)束后就終止

其二:守護(hù)進(jìn)程內(nèi)無法再開啟子進(jìn)程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children

注意:進(jìn)程之間是互相獨立的,主進(jìn)程代碼運行結(jié)束,守護(hù)進(jìn)程隨即終止

View Code

迷惑人的例子

五 進(jìn)程同步(鎖)

進(jìn)程之間數(shù)據(jù)不共享,但是共享同一套文件系統(tǒng),所以訪問同一個文件,或同一個打印終端,是沒有問題的,

而共享帶來的是競爭,競爭帶來的結(jié)果就是錯亂,如何控制,就是加鎖處理

part1:多個進(jìn)程共享同一打印終端

并發(fā)運行,效率高,但競爭同一打印終端,帶來了打印錯亂

加鎖:由并發(fā)變成了串行,犧牲了運行效率,但避免了競爭

part2:多個進(jìn)程共享同一文件

文件當(dāng)數(shù)據(jù)庫,模擬搶票

并發(fā)運行,效率高,但競爭寫同一文件,數(shù)據(jù)寫入錯亂

加鎖:購票行為由并發(fā)變成了串行,犧牲了運行效率,但保證了數(shù)據(jù)安全

總結(jié):

#加鎖可以保證多個進(jìn)程修改同一塊數(shù)據(jù)時,同一時間只能有一個任務(wù)可以進(jìn)行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數(shù)據(jù)安全。

雖然可以用文件共享數(shù)據(jù)實現(xiàn)進(jìn)程間通信,但問題是:1.效率低(共享數(shù)據(jù)基于文件,而文件是硬盤上的數(shù)據(jù))2.需要自己加鎖處理#因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進(jìn)程共享一塊內(nèi)存的數(shù)據(jù))2、幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基于消息的IPC通信機(jī)制:隊列和管道。

1隊列和管道都是將數(shù)據(jù)存放于內(nèi)存中2 隊列又是基于(管道+鎖)實現(xiàn)的,可以讓我們從復(fù)雜的鎖問題中解脫出來,

我們應(yīng)該盡量避免使用共享數(shù)據(jù),盡可能使用消息傳遞和隊列,避免處理復(fù)雜的同步和鎖問題,而且在進(jìn)程數(shù)目增多時,往往可以獲得更好的可獲展性。

六 隊列(推薦使用)

進(jìn)程彼此之間互相隔離,要實現(xiàn)進(jìn)程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

創(chuàng)建隊列的類(底層就是以管道和鎖定的方式實現(xiàn)):

1 Queue([maxsize]):創(chuàng)建共享的進(jìn)程隊列,Queue是多進(jìn)程安全的隊列,可以使用Queue實現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。

參數(shù)介紹:

1 maxsize是隊列中允許最大項數(shù),省略則無大小限制。

方法介紹:

主要方法:

1 q.put方法用以插入數(shù)據(jù)到隊列中,put方法還有兩個可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。

2 q.get方法可以從隊列讀取并且刪除一個元素。同樣,get方法有兩個可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,那么在等待時間內(nèi)沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.

3

4 q.get_nowait():同q.get(False)

5 q.put_nowait():同q.put(False)

6

7 q.empty():調(diào)用此方法時q為空則返回True,該結(jié)果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。

8 q.full():調(diào)用此方法時q已滿則返回True,該結(jié)果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。

9 q.qsize():返回隊列中目前項目的正確數(shù)量,結(jié)果也不可靠,理由同q.empty()和q.full()一樣

其他方法(了解):

1 q.cancel_join_thread():不會在進(jìn)程退出時自動連接后臺線程。可以防止join_thread()方法阻塞

2 q.close():關(guān)閉隊列,防止隊列中加入更多數(shù)據(jù)。調(diào)用此方法,后臺線程將繼續(xù)寫入那些已經(jīng)入隊列但尚未寫入的數(shù)據(jù),但將在此方法完成時馬上關(guān)閉。如果q被垃圾收集,將調(diào)用此方法。關(guān)閉隊列不會在隊列使用者中產(chǎn)生任何類型的數(shù)據(jù)結(jié)束信號或異常。例如,如果某個使用者正在被阻塞在get()操作上,關(guān)閉生產(chǎn)者中的隊列不會導(dǎo)致get()方法返回錯誤。

3 q.join_thread():連接隊列的后臺線程。此方法用于在調(diào)用q.close()方法之后,等待所有隊列項被消耗。默認(rèn)情況下,此方法由不是q的原始創(chuàng)建者的所有進(jìn)程調(diào)用。調(diào)用q.cancel_join_thread方法可以禁止這種行為

應(yīng)用:

View Code

生產(chǎn)者消費者模型

在并發(fā)編程中使用生產(chǎn)者和消費者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。

為什么要使用生產(chǎn)者和消費者模式

在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢,那么生產(chǎn)者就必須等待消費者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費者的處理能力大于生產(chǎn)者,那么消費者就必須等待生產(chǎn)者。為了解決這個問題于是引入了生產(chǎn)者和消費者模式。

什么是生產(chǎn)者消費者模式

生產(chǎn)者消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強(qiáng)耦合問題。生產(chǎn)者和消費者彼此之間不直接通訊,而通過阻塞隊列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取,阻塞隊列就相當(dāng)于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力。

基于隊列實現(xiàn)生產(chǎn)者消費者模型

View Code

#生產(chǎn)者消費者模型總結(jié)

#程序中有兩類角色

一類負(fù)責(zé)生產(chǎn)數(shù)據(jù)(生產(chǎn)者)

一類負(fù)責(zé)處理數(shù)據(jù)(消費者)#引入生產(chǎn)者消費者模型為了解決的問題是:

平衡生產(chǎn)者與消費者之間的速度差#如何實現(xiàn):

生產(chǎn)者-》隊列——》消費者#生產(chǎn)者消費者模型實現(xiàn)類程序的解耦和

此時的問題是主進(jìn)程永遠(yuǎn)不會結(jié)束,原因是:生產(chǎn)者p在生產(chǎn)完后就結(jié)束了,但是消費者c在取空了q之后,則一直處于死循環(huán)中且卡在q.get()這一步。

解決方式無非是讓生產(chǎn)者在生產(chǎn)完畢后,往隊列中再發(fā)一個結(jié)束信號,這樣消費者在接收到結(jié)束信號后就可以break出死循環(huán)

生產(chǎn)者在生產(chǎn)完畢后發(fā)送結(jié)束信號None

注意:結(jié)束信號None,不一定要由生產(chǎn)者發(fā),主進(jìn)程里同樣可以發(fā),但主進(jìn)程需要等生產(chǎn)者結(jié)束后才應(yīng)該發(fā)送該信號

主進(jìn)程在生產(chǎn)者生產(chǎn)完畢后發(fā)送結(jié)束信號None

但上述解決方式,在有多個生產(chǎn)者和多個消費者時,我們則需要用一個很low的方式去解決

有幾個消費者就需要發(fā)送幾次結(jié)束信號:相當(dāng)low

其實我們的思路無非是發(fā)送結(jié)束信號而已,有另外一種隊列提供了這種機(jī)制

#JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經(jīng)被成功處理。通知進(jìn)程是使用共享的信號和條件變量來實現(xiàn)的。

#參數(shù)介紹:

maxsize是隊列中允許最大項數(shù),省略則無大小限制。#方法介紹:

JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:

q.task_done():使用者使用此方法發(fā)出信號,表示q.get()的返回項目已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊列中刪除項目的數(shù)量,將引發(fā)ValueError異常

q.join():生產(chǎn)者調(diào)用此方法進(jìn)行阻塞,直到隊列中所有的項目均被處理。阻塞將持續(xù)到隊列中的每個項目均調(diào)用q.task_done()方法為止

View Code

七 管道

進(jìn)程間通信(IPC)方式二:管道(不推薦使用,了解即可)

介紹

基于管道實現(xiàn)進(jìn)程間通信(與隊列的方式是類似的,隊列就是管道加鎖實現(xiàn)的)

注意:生產(chǎn)者和消費者都沒有使用管道的某個端點,就應(yīng)該將其關(guān)閉,如在生產(chǎn)者中關(guān)閉管道的右端,在消費者中關(guān)閉管道的左端。如果忘記執(zhí)行這些步驟,程序可能再消費者中的recv()操作上掛起。管道是由操作系統(tǒng)進(jìn)行引用計數(shù)的,必須在所有進(jìn)程中關(guān)閉管道后才能生產(chǎn)EOFError異常。因此在生產(chǎn)者中關(guān)閉管道不會有任何效果,付費消費者中也關(guān)閉了相同的管道端點。

管道可以用于雙向通信,利用通常在客戶端/服務(wù)器中使用的請求/響應(yīng)模型或遠(yuǎn)程過程調(diào)用,就可以使用管道編寫與進(jìn)程交互的程序

八 共享數(shù)據(jù)

展望未來,基于消息傳遞的并發(fā)編程是大勢所趨

即便是使用線程,推薦做法也是將程序設(shè)計為大量獨立的線程集合

通過消息隊列交換數(shù)據(jù)。這樣極大地減少了對使用鎖定和其他同步手段的需求,

還可以擴(kuò)展到分布式系統(tǒng)中

進(jìn)程間通信應(yīng)該盡量避免使用本節(jié)所講的共享數(shù)據(jù)的方式

進(jìn)程間數(shù)據(jù)是獨立的,可以借助于隊列或管道實現(xiàn)通信,二者都是基于消息傳遞的

雖然進(jìn)程間數(shù)據(jù)獨立,但可以通過Manager實現(xiàn)數(shù)據(jù)共享,事實上Manager的功能遠(yuǎn)不止于此

A manager object returned by Manager() controls a server process which holds Python objectsandallows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Valueand Array. For example,

進(jìn)程之間操作共享的數(shù)據(jù)

九 信號量(了解)

信號量Semahpore(同線程一樣)

十 事件(了解)

Event(同線程一樣)

十一 進(jìn)程池

在利用Python進(jìn)行系統(tǒng)管理的時候,特別是同時操作多個文件目錄,或者遠(yuǎn)程控制多臺主機(jī),并行操作可以節(jié)約大量的時間。多進(jìn)程是實現(xiàn)并發(fā)的手段之一,需要注意的問題是:

很明顯需要并發(fā)執(zhí)行的任務(wù)通常要遠(yuǎn)大于核數(shù)

一個操作系統(tǒng)不可能無限開啟進(jìn)程,通常有幾個核就開幾個進(jìn)程

進(jìn)程開啟過多,效率反而會下降(開啟進(jìn)程是需要占用系統(tǒng)資源的,而且開啟多余核數(shù)目的進(jìn)程也無法做到并行)

例如當(dāng)被操作對象數(shù)目不大時,可以直接利用multiprocessing中的Process動態(tài)成生多個進(jìn)程,十幾個還好,但如果是上百個,上千個。。。手動的去限制進(jìn)程數(shù)量卻又太過繁瑣,此時可以發(fā)揮進(jìn)程池的功效。

我們就可以通過維護(hù)一個進(jìn)程池來控制進(jìn)程數(shù)目,比如httpd的進(jìn)程模式,規(guī)定最小進(jìn)程數(shù)和最大進(jìn)程數(shù)...

ps:對于遠(yuǎn)程過程調(diào)用的高級應(yīng)用程序而言,應(yīng)該使用進(jìn)程池,Pool可以提供指定數(shù)量的進(jìn)程,供用戶調(diào)用,當(dāng)有新的請求提交到pool中時,如果池還沒有滿,那么就會創(chuàng)建一個新的進(jìn)程用來執(zhí)行該請求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請求就會等待,直到池中有進(jìn)程結(jié)束,就重用進(jìn)程池中的進(jìn)程。

創(chuàng)建進(jìn)程池的類:如果指定numprocess為3,則進(jìn)程池會從無到有創(chuàng)建三個進(jìn)程,然后自始至終使用這三個進(jìn)程去執(zhí)行所有任務(wù),不會開啟其他進(jìn)程

1 Pool([numprocess [,initializer [, initargs]]]):創(chuàng)建進(jìn)程池

參數(shù)介紹:

1 numprocess:要創(chuàng)建的進(jìn)程數(shù),如果省略,將默認(rèn)使用cpu_count()的值

2 initializer:是每個工作進(jìn)程啟動時要執(zhí)行的可調(diào)用對象,默認(rèn)為None

3 initargs:是要傳給initializer的參數(shù)組

方法介紹:

主要方法:

1 p.apply(func [, args [, kwargs]]):在一個池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。需要強(qiáng)調(diào)的是:此操作并不會在所有池工作進(jìn)程中并執(zhí)行func函數(shù)。如果要通過不同參數(shù)并發(fā)地執(zhí)行func函數(shù),必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()2 p.apply_async(func [, args [, kwargs]]):在一個池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。此方法的結(jié)果是AsyncResult類的實例,callback是可調(diào)用對象,接收輸入?yún)?shù)。當(dāng)func的結(jié)果變?yōu)榭捎脮r,將理解傳遞給callback。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結(jié)果。3

4 p.close():關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成5 P.jion():等待所有工作進(jìn)程退出。此方法只能在close()或teminate()之后調(diào)用

其他方法(了解部分)

方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法

obj.get():返回結(jié)果,如果有必要則等待結(jié)果到達(dá)。timeout是可選的。如果在指定時間內(nèi)還沒有到達(dá),將引發(fā)一場。如果遠(yuǎn)程操作中引發(fā)了異常,它將在調(diào)用此方法時再次被引發(fā)。

obj.ready():如果調(diào)用完成,返回True

obj.successful():如果調(diào)用完成且沒有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常

obj.wait([timeout]):等待結(jié)果變?yōu)榭捎谩?/p>

obj.terminate():立即終止所有工作進(jìn)程,同時不執(zhí)行任何清理或結(jié)束任何掛起工作。如果p被垃圾回收,將自動調(diào)用此函數(shù)

View Code

應(yīng)用:

from multiprocessing importPoolimportos,timedefwork(n):print('%s run' %os.getpid())

time.sleep(3)return n**2

if __name__ == '__main__':

p=Pool(3) #進(jìn)程池中從無到有創(chuàng)建三個進(jìn)程,以后一直是這三個進(jìn)程在執(zhí)行任務(wù)

res_l=[]for i in range(10):

res=p.apply(work,args=(i,)) #同步調(diào)用,直到本次任務(wù)執(zhí)行完畢拿到res,等待任務(wù)work執(zhí)行的過程中可能有阻塞也可能沒有阻塞,但不管該任務(wù)是否存在阻塞,同步調(diào)用都會在原地等著,只是等的過程中若是任務(wù)發(fā)生了阻塞就會被奪走cpu的執(zhí)行權(quán)限

res_l.append(res)print(res_l)

同步調(diào)用apply

from multiprocessing importPoolimportos,timedefwork(n):print('%s run' %os.getpid())

time.sleep(3)return n**2

if __name__ == '__main__':

p=Pool(3) #進(jìn)程池中從無到有創(chuàng)建三個進(jìn)程,以后一直是這三個進(jìn)程在執(zhí)行任務(wù)

res_l=[]for i in range(10):

res=p.apply_async(work,args=(i,)) #同步運行,阻塞、直到本次任務(wù)執(zhí)行完畢拿到res

res_l.append(res)#異步apply_async用法:如果使用異步提交的任務(wù),主進(jìn)程需要使用jion,等待進(jìn)程池內(nèi)任務(wù)都處理完,然后可以用get收集結(jié)果,否則,主進(jìn)程結(jié)束,進(jìn)程池可能還沒來得及執(zhí)行,也就跟著一起結(jié)束了

p.close()

p.join()for res inres_l:print(res.get()) #使用get來獲取apply_aync的結(jié)果,如果是apply,則沒有g(shù)et方法,因為apply是同步執(zhí)行,立刻獲取結(jié)果,也根本無需get

異步調(diào)用apply_async

#一:使用進(jìn)程池(異步調(diào)用,apply_async)#coding: utf-8

from multiprocessing importProcess,Poolimporttimedeffunc(msg):print( "msg:", msg)

time.sleep(1)returnmsgif __name__ == "__main__":

pool= Pool(processes = 3)

res_l=[]for i in range(10):

msg= "hello %d" %(i)

res=pool.apply_async(func, (msg, )) #維持執(zhí)行的進(jìn)程總數(shù)為processes,當(dāng)一個進(jìn)程執(zhí)行完畢后會添加新的進(jìn)程進(jìn)去

res_l.append(res)print("==============================>") #沒有后面的join,或get,則程序整體結(jié)束,進(jìn)程池中的任務(wù)還沒來得及全部執(zhí)行完也都跟著主進(jìn)程一起結(jié)束了

pool.close()#關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成

pool.join() #調(diào)用join之前,先調(diào)用close函數(shù),否則會出錯。執(zhí)行完close后不會有新的進(jìn)程加入到pool,join函數(shù)等待所有子進(jìn)程結(jié)束

print(res_l) #看到的是對象組成的列表,而非最終的結(jié)果,但這一步是在join后執(zhí)行的,證明結(jié)果已經(jīng)計算完畢,剩下的事情就是調(diào)用每個對象下的get方法去獲取結(jié)果

for i inres_l:print(i.get()) #使用get來獲取apply_aync的結(jié)果,如果是apply,則沒有g(shù)et方法,因為apply是同步執(zhí)行,立刻獲取結(jié)果,也根本無需get

#二:使用進(jìn)程池(同步調(diào)用,apply)#coding: utf-8

from multiprocessing importProcess,Poolimporttimedeffunc(msg):print( "msg:", msg)

time.sleep(0.1)returnmsgif __name__ == "__main__":

pool= Pool(processes = 3)

res_l=[]for i in range(10):

msg= "hello %d" %(i)

res=pool.apply(func, (msg, )) #維持執(zhí)行的進(jìn)程總數(shù)為processes,當(dāng)一個進(jìn)程執(zhí)行完畢后會添加新的進(jìn)程進(jìn)去

res_l.append(res) #同步執(zhí)行,即執(zhí)行完一個拿到結(jié)果,再去執(zhí)行另外一個

print("==============================>")

pool.close()

pool.join()#調(diào)用join之前,先調(diào)用close函數(shù),否則會出錯。執(zhí)行完close后不會有新的進(jìn)程加入到pool,join函數(shù)等待所有子進(jìn)程結(jié)束

print(res_l) #看到的就是最終的結(jié)果組成的列表

for i in res_l: #apply是同步的,所以直接得到結(jié)果,沒有g(shù)et()方法

print(i)

詳解:apply_async與apply

練習(xí)2:使用進(jìn)程池維護(hù)固定數(shù)目的進(jìn)程(重寫練習(xí)1)

#Pool內(nèi)的進(jìn)程數(shù)默認(rèn)是cpu核數(shù),假設(shè)為4(查看方法os.cpu_count())#開啟6個客戶端,會發(fā)現(xiàn)2個客戶端處于等待狀態(tài)#在每個進(jìn)程內(nèi)查看pid,會發(fā)現(xiàn)pid使用為4個,即多個客戶端公用4個進(jìn)程

from socket import *

from multiprocessing importPoolimportos

server=socket(AF_INET,SOCK_STREAM)

server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)

server.bind(('127.0.0.1',8080))

server.listen(5)deftalk(conn,client_addr):print('進(jìn)程pid: %s' %os.getpid())whileTrue:try:

msg=conn.recv(1024)if not msg:breakconn.send(msg.upper())exceptException:break

if __name__ == '__main__':

p=Pool()whileTrue:

conn,client_addr=server.accept()

p.apply_async(talk,args=(conn,client_addr))#p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問

server端

from socket import *client=socket(AF_INET,SOCK_STREAM)

client.connect(('127.0.0.1',8080))whileTrue:

msg=input('>>:').strip()if not msg:continueclient.send(msg.encode('utf-8'))

msg=client.recv(1024)print(msg.decode('utf-8'))

客戶端

發(fā)現(xiàn):并發(fā)開啟多個客戶端,服務(wù)端同一時間只有3個不同的pid,干掉一個客戶端,另外一個客戶端才會進(jìn)來,被3個進(jìn)程之一處理

回掉函數(shù):

需要回調(diào)函數(shù)的場景:進(jìn)程池中任何一個任務(wù)一旦處理完了,就立即告知主進(jìn)程:我好了額,你可以處理我的結(jié)果了。主進(jìn)程則調(diào)用一個函數(shù)去處理該結(jié)果,該函數(shù)即回調(diào)函數(shù)

我們可以把耗時間(阻塞)的任務(wù)放到進(jìn)程池中,然后指定回調(diào)函數(shù)(主進(jìn)程負(fù)責(zé)執(zhí)行),這樣主進(jìn)程在執(zhí)行回調(diào)函數(shù)時就省去了I/O的過程,直接拿到的是任務(wù)的結(jié)果。

from multiprocessing importPoolimportrequestsimportjsonimportosdefget_page(url):print(' get %s' %(os.getpid(),url))

respone=requests.get(url)if respone.status_code == 200:return {'url':url,'text':respone.text}defpasrse_page(res):print(' parse %s' %(os.getpid(),res['url']))

parse_res='url: size:[%s]\n' %(res['url'],len(res['text']))

with open('db.txt','a') as f:

f.write(parse_res)if __name__ == '__main__':

urls=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']

p=Pool(3)

res_l=[]for url inurls:

res=p.apply_async(get_page,args=(url,),callback=pasrse_page)

res_l.append(res)

p.close()

p.join()print([res.get() for res in res_l]) #拿到的是get_page的結(jié)果,其實完全沒必要拿該結(jié)果,該結(jié)果已經(jīng)傳給回調(diào)函數(shù)處理了

'''打印結(jié)果:

get https://www.baidu.com

get https://www.python.org

get https://www.openstack.org

get https://help.github.com/

parse https://www.baidu.com

get http://www.sina.com.cn/

parse https://www.python.org

parse https://help.github.com/

parse http://www.sina.com.cn/

parse https://www.openstack.org

[{'url': 'https://www.baidu.com', 'text': '\r\n...',...}]'''

View Code

from multiprocessing importPoolimporttime,randomimportrequestsimportredefget_page(url,pattern):

response=requests.get(url)if response.status_code == 200:return(response.text,pattern)defparse_page(info):

page_content,pattern=info

res=re.findall(pattern,page_content)for item inres:

dic={'index':item[0],'title':item[1],'actor':item[2].strip()[3:],'time':item[3][5:],'score':item[4]+item[5]

}print(dic)if __name__ == '__main__':

pattern1=re.compile(r'

.*?board-index.*?>(\d+)<.>(.*?)<.>(.*?)<.>(.*?)<.>(.*?)

url_dic={'http://maoyan.com/board/7':pattern1,

}

p=Pool()

res_l=[]for url,pattern inurl_dic.items():

res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)

res_l.append(res)for i inres_l:

i.get()#res=requests.get('http://maoyan.com/board/7')

#print(re.findall(pattern,res.text))

爬蟲案例

如果在主進(jìn)程中等待進(jìn)程池中所有任務(wù)都執(zhí)行完畢后,再統(tǒng)一處理結(jié)果,則無需回調(diào)函數(shù)

from multiprocessing importPoolimporttime,random,osdefwork(n):

time.sleep(1)return n**2

if __name__ == '__main__':

p=Pool()

res_l=[]for i in range(10):

res=p.apply_async(work,args=(i,))

res_l.append(res)

p.close()

p.join()#等待進(jìn)程池中所有進(jìn)程執(zhí)行完畢

nums=[]for res inres_l:

nums.append(res.get())#拿到所有結(jié)果

print(nums) #主進(jìn)程拿到所有的處理結(jié)果,可以在主進(jìn)程中進(jìn)行統(tǒng)一進(jìn)行處理

View Code

總結(jié)

以上是生活随笔為你收集整理的python 多进程并发_python并发编程之多进程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。