Cpython解释器支持的进程与线程
一、理論部分
一 什么是進(jìn)程
? ? 進(jìn)程:正在進(jìn)行的一個(gè)過程或者說一個(gè)任務(wù)。而負(fù)責(zé)執(zhí)行任務(wù)則是cpu。
? ? 舉例(單核+多道,實(shí)現(xiàn)多個(gè)進(jìn)程的并發(fā)執(zhí)行):
? ? egon在一個(gè)時(shí)間段內(nèi)有很多任務(wù)要做:python備課的任務(wù),寫書的任務(wù),交女朋友的任務(wù),王者榮耀上分的任務(wù),
? ? 但egon同一時(shí)刻只能做一個(gè)任務(wù)(cpu同一時(shí)間只能干一個(gè)活),如何才能玩出多個(gè)任務(wù)并發(fā)執(zhí)行的效果?
? ? egon備一會(huì)課,再去跟李杰的女朋友聊聊天,再去打一會(huì)王者榮耀....這就保證了每個(gè)任務(wù)都在進(jìn)行中.
二 進(jìn)程與程序的區(qū)別
程序僅僅只是一堆代碼而已,而進(jìn)程指的是程序的運(yùn)行過程。
舉例:
想象一位有一手好廚藝的計(jì)算機(jī)科學(xué)家egon正在為他的女兒元昊烘制生日蛋糕。
他有做生日蛋糕的食譜,
廚房里有所需的原料:面粉、雞蛋、韭菜,蒜泥等。
在這個(gè)比喻中:
? ? 做蛋糕的食譜就是程序(即用適當(dāng)形式描述的算法)
? ? 計(jì)算機(jī)科學(xué)家就是處理器(cpu)
? ? 而做蛋糕的各種原料就是輸入數(shù)據(jù)。
? ?進(jìn)程就是廚師閱讀食譜、取來各種原料以及烘制蛋糕等一系列動(dòng)作的總和。
?
現(xiàn)在假設(shè)計(jì)算機(jī)科學(xué)家egon的兒子alex哭著跑了進(jìn)來,說:XXXXXXXXXXXXXX。
科學(xué)家egon想了想,處理兒子alex蟄傷的任務(wù)比給女兒元昊做蛋糕的任務(wù)更重要,于是
計(jì)算機(jī)科學(xué)家就記錄下他照著食譜做到哪兒了(保存進(jìn)程的當(dāng)前狀態(tài)),然后拿出一本急救手冊(cè),按照其中的指示處理蟄傷。這里,我們看到處理機(jī)從一個(gè)進(jìn)程(做蛋糕)切換到另一個(gè)高優(yōu)先級(jí)的進(jìn)程(實(shí)施醫(yī)療救治),每個(gè)進(jìn)程擁有各自的程序(食譜和急救手冊(cè))。當(dāng)蜜蜂蟄傷處理完之后,這位計(jì)算機(jī)科學(xué)家又回來做蛋糕,從他
離開時(shí)的那一步繼續(xù)做下去。
需要強(qiáng)調(diào)的是:同一個(gè)程序執(zhí)行兩次,那也是兩個(gè)進(jìn)程,比如打開暴風(fēng)影音,雖然都是同一個(gè)軟件,但是一個(gè)可以播放蒼井空,一個(gè)可以播放飯島愛。
三 并發(fā)與并行
無論是并行還是并發(fā),在用戶看來都是'同時(shí)'運(yùn)行的,不管是進(jìn)程還是線程,都只是一個(gè)任務(wù)而已,真是干活的是cpu,cpu來做這些任務(wù),而一個(gè)cpu同一時(shí)刻只能執(zhí)行一個(gè)任務(wù)
? ? ? 一 并發(fā):是偽并行,即看起來是同時(shí)運(yùn)行。單個(gè)cpu+多道技術(shù)就可以實(shí)現(xiàn)并發(fā),(并行也屬于并發(fā))
? ? ?二 并行:同時(shí)運(yùn)行,只有具備多個(gè)cpu才能實(shí)現(xiàn)并行
? ? ? ? ?單核下,可以利用多道技術(shù),多個(gè)核,每個(gè)核也都可以利用多道技術(shù)(多道技術(shù)是針對(duì)單核而言的)
? ? ? ? ?有四個(gè)核,六個(gè)任務(wù),這樣同一時(shí)間有四個(gè)任務(wù)被執(zhí)行,假設(shè)分別被分配給了cpu1,cpu2,cpu3,cpu4,
? ? ? ? ?一旦任務(wù)1遇到I/O就被迫中斷執(zhí)行,此時(shí)任務(wù)5就拿到cpu1的時(shí)間片去執(zhí)行,這就是單核下的多道技術(shù)
? ? ? ? ?而一旦任務(wù)1的I/O結(jié)束了,操作系統(tǒng)會(huì)重新調(diào)用它(需知進(jìn)程的調(diào)度、分配給哪個(gè)cpu運(yùn)行,由操作系統(tǒng)說了算),可能被分配給四個(gè)cpu中的任意一個(gè)去執(zhí)行
??
所有現(xiàn)代計(jì)算機(jī)經(jīng)常會(huì)在同一時(shí)間做很多件事,一個(gè)用戶的PC(無論是單cpu還是多cpu),都可以同時(shí)運(yùn)行多個(gè)任務(wù)(一個(gè)任務(wù)可以理解為一個(gè)進(jìn)程)。
啟動(dòng)一個(gè)進(jìn)程來殺毒(360軟件)
啟動(dòng)一個(gè)進(jìn)程來看電影(暴風(fēng)影音)
啟動(dòng)一個(gè)進(jìn)程來聊天(騰訊QQ)
所有的這些進(jìn)程都需被管理,于是一個(gè)支持多進(jìn)程的多道程序系統(tǒng)是至關(guān)重要的
多道技術(shù)概念回顧:內(nèi)存中同時(shí)存入多道(多個(gè))程序,cpu從一個(gè)進(jìn)程快速切換到另外一個(gè),使每個(gè)進(jìn)程各自運(yùn)行幾十或幾百毫秒,這樣,雖然在某一個(gè)瞬間,一個(gè)cpu只能執(zhí)行一個(gè)任務(wù),但在1秒內(nèi),cpu卻可以運(yùn)行多個(gè)進(jìn)程,這就給人產(chǎn)生了并行的錯(cuò)覺,即偽并發(fā),以此來區(qū)分多處理器操作系統(tǒng)的真正硬件并行(多個(gè)cpu共享同一個(gè)物理內(nèi)存)
四 同步與異步
同步執(zhí)行:一個(gè)進(jìn)程在執(zhí)行某個(gè)任務(wù)時(shí),另外一個(gè)進(jìn)程必須等待其執(zhí)行完畢,才能繼續(xù)執(zhí)行
異步執(zhí)行:一個(gè)進(jìn)程在執(zhí)行某個(gè)任務(wù)時(shí),另外一個(gè)進(jìn)程無需等待其執(zhí)行完畢,就可以繼續(xù)執(zhí)行,當(dāng)有消息返回時(shí),系統(tǒng)會(huì)通知后者進(jìn)行處理,這樣可以提高執(zhí)行效率
舉個(gè)例子,打電話時(shí)就是同步通信,發(fā)短息時(shí)就是異步通信。
同步\異步and阻塞\非阻塞(重點(diǎn))
同步:
#所謂同步,就是在發(fā)出一個(gè)功能調(diào)用時(shí),在沒有得到結(jié)果之前,該調(diào)用就不會(huì)返回。按照這個(gè)定義,其實(shí)絕大多數(shù)函數(shù)都是同步調(diào)用。但是一般而言,我們?cè)谡f同步、異步的時(shí)候,特指那些需要其他部件協(xié)作或者需要一定時(shí)間完成的任務(wù)。 #舉例: #1. multiprocessing.Pool下的apply #發(fā)起同步調(diào)用后,就在原地等著任務(wù)結(jié)束,根本不考慮任務(wù)是在計(jì)算還是在io阻塞,總之就是一股腦地等任務(wù)結(jié)束 #2. concurrent.futures.ProcessPoolExecutor().submit(func,).result() #3. concurrent.futures.ThreadPoolExecutor().submit(func,).result()異步:
#異步的概念和同步相對(duì)。當(dāng)一個(gè)異步功能調(diào)用發(fā)出后,調(diào)用者不能立刻得到結(jié)果。當(dāng)該異步功能完成后,通過狀態(tài)、通知或回調(diào)來通知調(diào)用者。如果異步功能用狀態(tài)來通知,那么調(diào)用者就需要每隔一定時(shí)間檢查一次,效率就很低(有些初學(xué)多線程編程的人,總喜歡用一個(gè)循環(huán)去檢查某個(gè)變量的值,這其實(shí)是一 種很嚴(yán)重的錯(cuò)誤)。如果是使用通知的方式,效率則很高,因?yàn)楫惒焦δ軒缀醪恍枰鲱~外的操作。至于回調(diào)函數(shù),其實(shí)和通知沒太多區(qū)別。 #舉例: #1. multiprocessing.Pool().apply_async() #發(fā)起異步調(diào)用后,并不會(huì)等待任務(wù)結(jié)束才返回,相反,會(huì)立即獲取一個(gè)臨時(shí)結(jié)果(并不是最終的結(jié)果,可能是封裝好的一個(gè)對(duì)象)。 #2. concurrent.futures.ProcessPoolExecutor(3).submit(func,) #3. concurrent.futures.ThreadPoolExecutor(3).submit(func,)阻塞:
#阻塞調(diào)用是指調(diào)用結(jié)果返回之前,當(dāng)前線程會(huì)被掛起(如遇到io操作)。函數(shù)只有在得到結(jié)果之后才會(huì)將阻塞的線程激活。有人也許會(huì)把阻塞調(diào)用和同步調(diào)用等同起來,實(shí)際上他是不同的。對(duì)于同步調(diào)用來說,很多時(shí)候當(dāng)前線程還是激活的,只是從邏輯上當(dāng)前函數(shù)沒有返回而已。 #舉例: #1. 同步調(diào)用:apply一個(gè)累計(jì)1億次的任務(wù),該調(diào)用會(huì)一直等待,直到任務(wù)返回結(jié)果為止,但并未阻塞住(即便是被搶走cpu的執(zhí)行權(quán)限,那也是處于就緒態(tài)); #2. 阻塞調(diào)用:當(dāng)socket工作在阻塞模式的時(shí)候,如果沒有數(shù)據(jù)的情況下調(diào)用recv函數(shù),則當(dāng)前線程就會(huì)被掛起,直到有數(shù)據(jù)為止。非阻塞:
#非阻塞和阻塞的概念相對(duì)應(yīng),指在不能立刻得到結(jié)果之前也會(huì)立刻返回,同時(shí)該函數(shù)不會(huì)阻塞當(dāng)前線程。小結(jié):
#1. 同步與異步針對(duì)的是函數(shù)/任務(wù)的調(diào)用方式:同步就是當(dāng)一個(gè)進(jìn)程發(fā)起一個(gè)函數(shù)(任務(wù))調(diào)用的時(shí)候,一直等到函數(shù)(任務(wù))完成,而進(jìn)程繼續(xù)處于激活狀態(tài)。而異步情況下是當(dāng)一個(gè)進(jìn)程發(fā)起一個(gè)函數(shù)(任務(wù))調(diào)用的時(shí)候,不會(huì)等函數(shù)返回,而是繼續(xù)往下執(zhí)行當(dāng),函數(shù)返回的時(shí)候通過狀態(tài)、通知、事件等方式通知進(jìn)程任務(wù)完成。#2. 阻塞與非阻塞針對(duì)的是進(jìn)程或線程:阻塞是當(dāng)請(qǐng)求不能滿足的時(shí)候就將進(jìn)程掛起,而非阻塞則不會(huì)阻塞當(dāng)前進(jìn)程?
五 進(jìn)程的創(chuàng)建(了解)
但凡是硬件,都需要有操作系統(tǒng)去管理,只要有操作系統(tǒng),就有進(jìn)程的概念,就需要有創(chuàng)建進(jìn)程的方式,一些操作系統(tǒng)只為一個(gè)應(yīng)用程序設(shè)計(jì),比如微波爐中的控制器,一旦啟動(dòng)微波爐,所有的進(jìn)程都已經(jīng)存在。
而對(duì)于通用系統(tǒng)(跑很多應(yīng)用程序),需要有系統(tǒng)運(yùn)行過程中創(chuàng)建或撤銷進(jìn)程的能力,主要分為4中形式創(chuàng)建新的進(jìn)程
1. 系統(tǒng)初始化(查看進(jìn)程linux中用ps命令,windows中用任務(wù)管理器,前臺(tái)進(jìn)程負(fù)責(zé)與用戶交互,后臺(tái)運(yùn)行的進(jìn)程與用戶無關(guān),運(yùn)行在后臺(tái)并且只在需要時(shí)才喚醒的進(jìn)程,稱為守護(hù)進(jìn)程,如電子郵件、web頁面、新聞、打印)
2. 一個(gè)進(jìn)程在運(yùn)行過程中開啟了子進(jìn)程(如nginx開啟多進(jìn)程,os.fork,subprocess.Popen等)
3. 用戶的交互式請(qǐng)求,而創(chuàng)建一個(gè)新進(jìn)程(如用戶雙擊暴風(fēng)影音)
4. 一個(gè)批處理作業(yè)的初始化(只在大型機(jī)的批處理系統(tǒng)中應(yīng)用)
無論哪一種,新進(jìn)程的創(chuàng)建都是由一個(gè)已經(jīng)存在的進(jìn)程執(zhí)行了一個(gè)用于創(chuàng)建進(jìn)程的系統(tǒng)調(diào)用而創(chuàng)建的:
1. 在UNIX中該系統(tǒng)調(diào)用是:fork,fork會(huì)創(chuàng)建一個(gè)與父進(jìn)程一模一樣的副本,二者有相同的存儲(chǔ)映像、同樣的環(huán)境字符串和同樣的打開文件(在shell解釋器進(jìn)程中,執(zhí)行一個(gè)命令就會(huì)創(chuàng)建一個(gè)子進(jìn)程)
2. 在windows中該系統(tǒng)調(diào)用是:CreateProcess,CreateProcess既處理進(jìn)程的創(chuàng)建,也負(fù)責(zé)把正確的程序裝入新進(jìn)程。
關(guān)于創(chuàng)建的子進(jìn)程,UNIX和windows
1.相同的是:進(jìn)程創(chuàng)建后,父進(jìn)程和子進(jìn)程有各自不同的地址空間(多道技術(shù)要求物理層面實(shí)現(xiàn)進(jìn)程之間內(nèi)存的隔離),任何一個(gè)進(jìn)程的在其地址空間中的修改都不會(huì)影響到另外一個(gè)進(jìn)程。
2.不同的是:在UNIX中,子進(jìn)程的初始地址空間是父進(jìn)程的一個(gè)副本,提示:子進(jìn)程和父進(jìn)程是可以有只讀的共享內(nèi)存區(qū)的。但是對(duì)于windows系統(tǒng)來說,從一開始父進(jìn)程與子進(jìn)程的地址空間就是不同的。
六 進(jìn)程的終止(了解)
1. 正常退出(自愿,如用戶點(diǎn)擊交互式頁面的叉號(hào),或程序執(zhí)行完畢調(diào)用發(fā)起系統(tǒng)調(diào)用正常退出,在linux中用exit,在windows中用ExitProcess)
2. 出錯(cuò)退出(自愿,python a.py中a.py不存在)
3. 嚴(yán)重錯(cuò)誤(非自愿,執(zhí)行非法指令,如引用不存在的內(nèi)存,1/0等,可以捕捉異常,try...except...)
4. 被其他進(jìn)程殺死(非自愿,如kill -9)
七 進(jìn)程的層次結(jié)構(gòu)
無論UNIX還是windows,進(jìn)程只有一個(gè)父進(jìn)程,不同的是:
1. 在UNIX中所有的進(jìn)程,都是以init進(jìn)程為根,組成樹形結(jié)構(gòu)。父子進(jìn)程共同組成一個(gè)進(jìn)程組,這樣,當(dāng)從鍵盤發(fā)出一個(gè)信號(hào)時(shí),該信號(hào)被送給當(dāng)前與鍵盤相關(guān)的進(jìn)程組中的所有成員。
2. 在windows中,沒有進(jìn)程層次的概念,所有的進(jìn)程都是地位相同的,唯一類似于進(jìn)程層次的暗示,是在創(chuàng)建進(jìn)程時(shí),父進(jìn)程得到一個(gè)特別的令牌(稱為句柄),該句柄可以用來控制子進(jìn)程,但是父進(jìn)程有權(quán)把該句柄傳給其他子進(jìn)程,這樣就沒有層次了。
八 進(jìn)程的狀態(tài)
tail -f access.log |grep '404'
執(zhí)行程序tail,開啟一個(gè)子進(jìn)程,執(zhí)行程序grep,開啟另外一個(gè)子進(jìn)程,兩個(gè)進(jìn)程之間基于管道'|'通訊,將tail的結(jié)果作為grep的輸入。
進(jìn)程grep在等待輸入(即I/O)時(shí)的狀態(tài)稱為阻塞,此時(shí)grep命令都無法運(yùn)行
其實(shí)在兩種情況下會(huì)導(dǎo)致一個(gè)進(jìn)程在邏輯上不能運(yùn)行,
1. 進(jìn)程掛起是自身原因,遇到I/O阻塞,便要讓出CPU讓其他進(jìn)程去執(zhí)行,這樣保證CPU一直在工作
2. 與進(jìn)程無關(guān),是操作系統(tǒng)層面,可能會(huì)因?yàn)橐粋€(gè)進(jìn)程占用時(shí)間過多,或者優(yōu)先級(jí)等原因,而調(diào)用其他的進(jìn)程去使用CPU。
因而一個(gè)進(jìn)程由三種狀態(tài)
九 進(jìn)程并發(fā)的實(shí)現(xiàn)(了解)
進(jìn)程并發(fā)的實(shí)現(xiàn)在于,硬件中斷一個(gè)正在運(yùn)行的進(jìn)程,把此時(shí)進(jìn)程運(yùn)行的所有狀態(tài)保存下來,為此,操作系統(tǒng)維護(hù)一張表格,即進(jìn)程表(process table),每個(gè)進(jìn)程占用一個(gè)進(jìn)程表項(xiàng)(這些表項(xiàng)也稱為進(jìn)程控制塊)
該表存放了進(jìn)程狀態(tài)的重要信息:程序計(jì)數(shù)器、堆棧指針、內(nèi)存分配狀況、所有打開文件的狀態(tài)、帳號(hào)和調(diào)度信息,以及其他在進(jìn)程由運(yùn)行態(tài)轉(zhuǎn)為就緒態(tài)或阻塞態(tài)時(shí),必須保存的信息,從而保證該進(jìn)程在再次啟動(dòng)時(shí),就像從未被中斷過一樣。
二、代碼知識(shí)部分
一 multiprocessing模塊介紹:
python中的多線程無法利用多核優(yōu)勢(shì),如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多進(jìn)程。Python提供了multiprocessing。
? ? multiprocessing模塊用來開啟子進(jìn)程,并在子進(jìn)程中執(zhí)行我們定制的任務(wù)(比如函數(shù)),該模塊與多線程模塊threading的編程接口類似。
?multiprocessing模塊的功能眾多:支持子進(jìn)程、通信和共享數(shù)據(jù)、執(zhí)行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
? ? 需要再次強(qiáng)調(diào)的一點(diǎn)是:與線程不同,進(jìn)程沒有任何共享狀態(tài),進(jìn)程修改的數(shù)據(jù),改動(dòng)僅限于該進(jìn)程內(nèi)。
?
二 Process類的介紹
?創(chuàng)建進(jìn)程的類:
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實(shí)例化得到的對(duì)象,表示一個(gè)子進(jìn)程中的任務(wù)(尚未啟動(dòng))強(qiáng)調(diào): 1. 需要使用關(guān)鍵字的方式來指定參數(shù) 2. args指定的為傳給target函數(shù)的位置參數(shù),是一個(gè)元組形式,必須有逗號(hào)??參數(shù)介紹:
group參數(shù)未使用,值始終為None
target表示調(diào)用對(duì)象,即子進(jìn)程要執(zhí)行的任務(wù)
args表示調(diào)用對(duì)象的位置參數(shù)元組,args=(1,2,'egon',)
kwargs表示調(diào)用對(duì)象的字典,kwargs={'name':'egon','age':18}
name為子進(jìn)程的名稱
?
?方法介紹:
p.start():啟動(dòng)進(jìn)程,并調(diào)用該子進(jìn)程中的p.run()
p.run():進(jìn)程啟動(dòng)時(shí)運(yùn)行的方法,正是它去調(diào)用target指定的函數(shù),我們自定義類的類中一定要實(shí)現(xiàn)該方法
p.terminate():強(qiáng)制終止進(jìn)程p,不會(huì)進(jìn)行任何清理操作,如果p創(chuàng)建了子進(jìn)程,該子進(jìn)程就成了僵尸進(jìn)程,使用該方法需要特別小心這種情況。如果p還保存了一個(gè)鎖那么也將不會(huì)被釋放,進(jìn)而導(dǎo)致死鎖
p.is_alive():如果p仍然運(yùn)行,返回True
p.join([timeout]):主線程等待p終止(強(qiáng)調(diào):是主線程處于等的狀態(tài),而p是處于運(yùn)行的狀態(tài))。timeout是可選的超時(shí)時(shí)間,需要強(qiáng)調(diào)的是,p.join只能join住start開啟的進(jìn)程,而不能join住run開啟的進(jìn)程
屬性介紹:
1 p.daemon:默認(rèn)值為False,如果設(shè)為True,代表p為后臺(tái)運(yùn)行的守護(hù)進(jìn)程,當(dāng)p的父進(jìn)程終止時(shí),p也隨之終止,并且設(shè)定為True后,p不能創(chuàng)建自己的新進(jìn)程,必須在p.start()之前設(shè)置 2 3 p.name:進(jìn)程的名稱 4 5 p.pid:進(jìn)程的pid 6 7 p.exitcode:進(jìn)程在運(yùn)行時(shí)為None、如果為–N,表示被信號(hào)N結(jié)束(了解即可) 8 9 p.authkey:進(jìn)程的身份驗(yàn)證鍵,默認(rèn)是由os.urandom()隨機(jī)生成的32字符的字符串。這個(gè)鍵的用途是為涉及網(wǎng)絡(luò)連接的底層進(jìn)程間通信提供安全性,這類連接只有在具有相同的身份驗(yàn)證鍵時(shí)才能成功(了解即可)三 Process類的使用
注意:在windows中Process()必須放到# if __name__ == '__main__':下
Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). This is the reason for hiding calls to Process() insideif __name__ == "__main__" since statements inside this if-statement will not get called upon import. 由于Windows沒有fork,多處理模塊啟動(dòng)一個(gè)新的Python進(jìn)程并導(dǎo)入調(diào)用模塊。 如果在導(dǎo)入時(shí)調(diào)用Process(),那么這將啟動(dòng)無限繼承的新進(jìn)程(或直到機(jī)器耗盡資源)。 這是隱藏對(duì)Process()內(nèi)部調(diào)用的原,使用if __name__ == “__main __”,這個(gè)if語句中的語句將不會(huì)在導(dǎo)入時(shí)被調(diào)用。創(chuàng)建并開啟子進(jìn)程的兩種方式
#開進(jìn)程的方法一: import time import random from multiprocessing import Process def piao(name):print('%s piaoing' %name)time.sleep(random.randrange(1,5))print('%s piao end' %name)p1=Process(target=piao,args=('egon',)) #必須加,號(hào) p2=Process(target=piao,args=('alex',)) p3=Process(target=piao,args=('wupeqi',)) p4=Process(target=piao,args=('yuanhao',))p1.start() p2.start() p3.start() p4.start() print('主線程')方法一 #開進(jìn)程的方法二: import time import random from multiprocessing import Processclass Piao(Process):def __init__(self,name):super().__init__()self.name=namedef run(self):print('%s piaoing' %self.name)time.sleep(random.randrange(1,5))print('%s piao end' %self.name)p1=Piao('egon') p2=Piao('alex') p3=Piao('wupeiqi') p4=Piao('yuanhao')p1.start() #start會(huì)自動(dòng)調(diào)用run p2.start() p3.start() p4.start() print('主線程')練習(xí)1:把所學(xué)的socket通信變成并發(fā)的形式
from socket import * from multiprocessing import Processserver=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5)def talk(conn,client_addr):while True:try:msg=conn.recv(1024)if not msg:breakconn.send(msg.upper())except Exception:breakif __name__ == '__main__': #windows下start進(jìn)程一定要寫到這下面while True:conn,client_addr=server.accept()p=Process(target=talk,args=(conn,client_addr))p.start()server端 from socket import *client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080))while True:msg=input('>>: ').strip()if not msg:continueclient.send(msg.encode('utf-8'))msg=client.recv(1024)print(msg.decode('utf-8'))多個(gè)client端 每來一個(gè)客戶端,都在服務(wù)端開啟一個(gè)進(jìn)程,如果并發(fā)來一個(gè)萬個(gè)客戶端,要開啟一萬個(gè)進(jìn)程嗎,你自己嘗試著在你自己的機(jī)器上開啟一萬個(gè),10萬個(gè)進(jìn)程試一試。 解決方法:進(jìn)程池Process對(duì)象的join方法
join:主進(jìn)程等,等待子進(jìn)程結(jié)束
from multiprocessing import Process import time import randomclass Piao(Process):def __init__(self,name):self.name=namesuper().__init__()def run(self):print('%s is piaoing' %self.name)time.sleep(random.randrange(1,3))print('%s is piao end' %self.name)p=Piao('egon') p.start() p.join(0.0001) #等待p停止,等0.0001秒就不再等了 print('開始')有了join,程序不就是串行了嗎???
from multiprocessing import Process import time import random def piao(name):print('%s is piaoing' %name)time.sleep(random.randint(1,3))print('%s is piao end' %name)p1=Process(target=piao,args=('egon',)) p2=Process(target=piao,args=('alex',)) p3=Process(target=piao,args=('yuanhao',)) p4=Process(target=piao,args=('wupeiqi',))p1.start() p2.start() p3.start() p4.start()#有的同學(xué)會(huì)有疑問:既然join是等待進(jìn)程結(jié)束,那么我像下面這樣寫,進(jìn)程不就又變成串行的了嗎? #當(dāng)然不是了,必須明確:p.join()是讓誰等? #很明顯p.join()是讓主線程等待p的結(jié)束,卡住的是主線程而絕非進(jìn)程p,#詳細(xì)解析如下: #進(jìn)程只要start就會(huì)在開始運(yùn)行了,所以p1-p4.start()時(shí),系統(tǒng)中已經(jīng)有四個(gè)并發(fā)的進(jìn)程了 #而我們p1.join()是在等p1結(jié)束,沒錯(cuò)p1只要不結(jié)束主線程就會(huì)一直卡在原地,這也是問題的關(guān)鍵 #join是讓主線程等,而p1-p4仍然是并發(fā)執(zhí)行的,p1.join的時(shí)候,其余p2,p3,p4仍然在運(yùn)行,等#p1.join結(jié)束,可能p2,p3,p4早已經(jīng)結(jié)束了,這樣p2.join,p3.join.p4.join直接通過檢測(cè),無需等待 # 所以4個(gè)join花費(fèi)的總時(shí)間仍然是耗費(fèi)時(shí)間最長(zhǎng)的那個(gè)進(jìn)程運(yùn)行的時(shí)間 p1.join() p2.join() p3.join() p4.join()print('主線程')#上述啟動(dòng)進(jìn)程與join進(jìn)程可以簡(jiǎn)寫為 # p_l=[p1,p2,p3,p4] # # for p in p_l: # p.start() # # for p in p_l: # p.join()?
Process對(duì)象的其他方法或?qū)傩?#xff08;了解)
terminate與is_alive #進(jìn)程對(duì)象的其他方法一:terminate,is_alive from multiprocessing import Process import time import randomclass Piao(Process):def __init__(self,name):self.name=namesuper().__init__()def run(self):print('%s is piaoing' %self.name)time.sleep(random.randrange(1,5))print('%s is piao end' %self.name)p1=Piao('egon1') p1.start()p1.terminate()#關(guān)閉進(jìn)程,不會(huì)立即關(guān)閉,所以is_alive立刻查看的結(jié)果可能還是存活 print(p1.is_alive()) #結(jié)果為Trueprint('開始') print(p1.is_alive()) #結(jié)果為Falsename與pid from multiprocessing import Process import time import random class Piao(Process):def __init__(self,name):# self.name=name# super().__init__() #Process的__init__方法會(huì)執(zhí)行self.name=Piao-1,# #所以加到這里,會(huì)覆蓋我們的self.name=name#為我們開啟的進(jìn)程設(shè)置名字的做法super().__init__()self.name=namedef run(self):print('%s is piaoing' %self.name)time.sleep(random.randrange(1,3))print('%s is piao end' %self.name)p=Piao('egon') p.start() print('開始') print(p.pid) #查看pid
?
四 守護(hù)進(jìn)程
主進(jìn)程創(chuàng)建守護(hù)進(jìn)程
其一:守護(hù)進(jìn)程會(huì)在主進(jìn)程代碼執(zhí)行結(jié)束后就終止
其二:守護(hù)進(jìn)程內(nèi)無法再開啟子進(jìn)程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進(jìn)程之間是互相獨(dú)立的,主進(jìn)程代碼運(yùn)行結(jié)束,守護(hù)進(jìn)程隨即終止
?
from multiprocessing import Process import time import randomclass Piao(Process):def __init__(self,name):self.name=namesuper().__init__()def run(self):print('%s is piaoing' %self.name)time.sleep(random.randrange(1,3))print('%s is piao end' %self.name)p=Piao('egon') p.daemon=True #一定要在p.start()前設(shè)置,設(shè)置p為守護(hù)進(jìn)程,禁止p創(chuàng)建子進(jìn)程,并且父進(jìn)程代碼執(zhí)行結(jié)束,p即終止運(yùn)行 p.start() print('主') #主進(jìn)程代碼運(yùn)行完畢,守護(hù)進(jìn)程就會(huì)結(jié)束 from multiprocessing import Process from threading import Thread import time def foo():print(123)time.sleep(1)print("end123")def bar():print(456)time.sleep(3)print("end456")p1=Process(target=foo) p2=Process(target=bar)p1.daemon=True p1.start() p2.start() print("main-------") #打印該行則主進(jìn)程代碼結(jié)束,則守護(hù)進(jìn)程p1應(yīng)該被終止,可能會(huì)有p1任務(wù)執(zhí)行的打印信息123,因?yàn)橹鬟M(jìn)程打印main----時(shí),p1也執(zhí)行了,但是隨即被終止 迷惑人的例子五 進(jìn)程同步(鎖)
進(jìn)程之間數(shù)據(jù)不共享,但是共享同一套文件系統(tǒng),所以訪問同一個(gè)文件,或同一個(gè)打印終端,是沒有問題的,
競(jìng)爭(zhēng)帶來的結(jié)果就是錯(cuò)亂,如何控制,就是加鎖處理
part1:多個(gè)進(jìn)程共享同一打印終端
并發(fā)運(yùn)行,效率高,但競(jìng)爭(zhēng)同一打印終端,帶來了打印錯(cuò)亂 #并發(fā)運(yùn)行,效率高,但競(jìng)爭(zhēng)同一打印終端,帶來了打印錯(cuò)亂 from multiprocessing import Process import os,time def work():print('%s is running' %os.getpid())time.sleep(2)print('%s is done' %os.getpid())if __name__ == '__main__':for i in range(3):p=Process(target=work)p.start()加鎖:由并發(fā)變成了串行,犧牲了運(yùn)行效率,但避免了競(jìng)爭(zhēng) #由并發(fā)變成了串行,犧牲了運(yùn)行效率,但避免了競(jìng)爭(zhēng) from multiprocessing import Process,Lock import os,time def work(lock):lock.acquire()print('%s is running' %os.getpid())time.sleep(2)print('%s is done' %os.getpid())lock.release() if __name__ == '__main__':lock=Lock()for i in range(3):p=Process(target=work,args=(lock,))p.start()
part2:多個(gè)進(jìn)程共享同一文件
文件當(dāng)數(shù)據(jù)庫,模擬搶票
并發(fā)運(yùn)行,效率高,但競(jìng)爭(zhēng)寫同一文件,數(shù)據(jù)寫入錯(cuò)亂 #文件db的內(nèi)容為:{"count":1} #注意一定要用雙引號(hào),不然json無法識(shí)別 from multiprocessing import Process,Lock import time,json,random def search():dic=json.load(open('db.txt'))print('\033[43m剩余票數(shù)%s\033[0m' %dic['count'])def get():dic=json.load(open('db.txt'))time.sleep(0.1) #模擬讀數(shù)據(jù)的網(wǎng)絡(luò)延遲if dic['count'] >0:dic['count']-=1time.sleep(0.2) #模擬寫數(shù)據(jù)的網(wǎng)絡(luò)延遲json.dump(dic,open('db.txt','w'))print('\033[43m購票成功\033[0m')def task(lock):search()get() if __name__ == '__main__':lock=Lock()for i in range(100): #模擬并發(fā)100個(gè)客戶端搶票p=Process(target=task,args=(lock,))p.start()加鎖:購票行為由并發(fā)變成了串行,犧牲了運(yùn)行效率,但保證了數(shù)據(jù)安全 #文件db的內(nèi)容為:{"count":1} #注意一定要用雙引號(hào),不然json無法識(shí)別 from multiprocessing import Process,Lock import time,json,random def search():dic=json.load(open('db.txt'))print('\033[43m剩余票數(shù)%s\033[0m' %dic['count'])def get():dic=json.load(open('db.txt'))time.sleep(0.1) #模擬讀數(shù)據(jù)的網(wǎng)絡(luò)延遲if dic['count'] >0:dic['count']-=1time.sleep(0.2) #模擬寫數(shù)據(jù)的網(wǎng)絡(luò)延遲json.dump(dic,open('db.txt','w'))print('\033[43m購票成功\033[0m')def task(lock):search()lock.acquire()get()lock.release() if __name__ == '__main__':lock=Lock()for i in range(100): #模擬并發(fā)100個(gè)客戶端搶票p=Process(target=task,args=(lock,))p.start()
總結(jié):
加鎖可以保證多個(gè)進(jìn)程修改同一塊數(shù)據(jù)時(shí),同一時(shí)間只能有一個(gè)任務(wù)可以進(jìn)行修改,即串行的修改,沒錯(cuò),速度是慢了,但犧牲了速度卻保證了數(shù)據(jù)安全。
雖然可以用文件共享數(shù)據(jù)實(shí)現(xiàn)進(jìn)程間通信,但問題是:
1.效率低
2.需要自己加鎖處理
?
為此mutiprocessing模塊為我們提供了基于消息的IPC通信機(jī)制:隊(duì)列和管道。
1 隊(duì)列和管道都是將數(shù)據(jù)存放于內(nèi)存中
2 隊(duì)列又是基于(管道+鎖)實(shí)現(xiàn)的,可以讓我們從復(fù)雜的鎖問題中解脫出來,
我們應(yīng)該盡量避免使用共享數(shù)據(jù),盡可能使用消息傳遞和隊(duì)列,避免處理復(fù)雜的同步和鎖問題,而且在進(jìn)程數(shù)目增多時(shí),往往可以獲得更好的可獲展性。
?
六 隊(duì)列(推薦使用)
?? 進(jìn)程彼此之間互相隔離,要實(shí)現(xiàn)進(jìn)程間通信(IPC),multiprocessing模塊支持兩種形式:隊(duì)列和管道,這兩種方式都是使用消息傳遞的
創(chuàng)建隊(duì)列的類(底層就是以管道和鎖定的方式實(shí)現(xiàn)):
1 Queue([maxsize]):創(chuàng)建共享的進(jìn)程隊(duì)列,Queue是多進(jìn)程安全的隊(duì)列,可以使用Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。?? ??參數(shù)介紹:
1 maxsize是隊(duì)列中允許最大項(xiàng)數(shù),省略則無大小限制。?方法介紹:
主要方法: q.put方法用以插入數(shù)據(jù)到隊(duì)列中,put方法還有兩個(gè)可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,該方法會(huì)阻塞timeout指定的時(shí)間,直到該隊(duì)列有剩余的空間。如果超時(shí),會(huì)拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會(huì)立即拋出Queue.Full異常。 q.get方法可以從隊(duì)列讀取并且刪除一個(gè)元素。同樣,get方法有兩個(gè)可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,那么在等待時(shí)間內(nèi)沒有取到任何元素,會(huì)拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個(gè)值可用,則立即返回該值,否則,如果隊(duì)列為空,則立即拋出Queue.Empty異常.q.get_nowait():同q.get(False) q.put_nowait():同q.put(False)q.empty():調(diào)用此方法時(shí)q為空則返回True,該結(jié)果不可靠,比如在返回True的過程中,如果隊(duì)列中又加入了項(xiàng)目。 q.full():調(diào)用此方法時(shí)q已滿則返回True,該結(jié)果不可靠,比如在返回True的過程中,如果隊(duì)列中的項(xiàng)目被取走。 q.qsize():返回隊(duì)列中目前項(xiàng)目的正確數(shù)量,結(jié)果也不可靠,理由同q.empty()和q.full()一樣其他方法(了解):
1 q.cancel_join_thread():不會(huì)在進(jìn)程退出時(shí)自動(dòng)連接后臺(tái)線程??梢苑乐筳oin_thread()方法阻塞 2 q.close():關(guān)閉隊(duì)列,防止隊(duì)列中加入更多數(shù)據(jù)。調(diào)用此方法,后臺(tái)線程將繼續(xù)寫入那些已經(jīng)入隊(duì)列但尚未寫入的數(shù)據(jù),但將在此方法完成時(shí)馬上關(guān)閉。如果q被垃圾收集,將調(diào)用此方法。關(guān)閉隊(duì)列不會(huì)在隊(duì)列使用者中產(chǎn)生任何類型的數(shù)據(jù)結(jié)束信號(hào)或異常。例如,如果某個(gè)使用者正在被阻塞在get()操作上,關(guān)閉生產(chǎn)者中的隊(duì)列不會(huì)導(dǎo)致get()方法返回錯(cuò)誤。 3 q.join_thread():連接隊(duì)列的后臺(tái)線程。此方法用于在調(diào)用q.close()方法之后,等待所有隊(duì)列項(xiàng)被消耗。默認(rèn)情況下,此方法由不是q的原始創(chuàng)建者的所有進(jìn)程調(diào)用。調(diào)用q.cancel_join_thread方法可以禁止這種行為? 應(yīng)用:
''' multiprocessing模塊支持進(jìn)程間通信的兩種主要形式:管道和隊(duì)列 都是基于消息傳遞實(shí)現(xiàn)的,但是隊(duì)列接口 '''from multiprocessing import Process,Queue import time q=Queue(3)#put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) print(q.full()) #滿了print(q.get()) print(q.get()) print(q.get()) print(q.empty()) #空了生產(chǎn)者消費(fèi)者模型
在并發(fā)編程中使用生產(chǎn)者和消費(fèi)者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費(fèi)線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。
? ? 為什么要使用生產(chǎn)者和消費(fèi)者模式
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個(gè)問題于是引入了生產(chǎn)者和消費(fèi)者模式。
? ? 什么是生產(chǎn)者消費(fèi)者模式
生產(chǎn)者消費(fèi)者模式是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊(duì)列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。
基于隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
from multiprocessing import Process,Queue import time,random,os def consumer(q):while True:res=q.get()time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):for i in range(10):time.sleep(random.randint(1,3))res='包子%s' %iq.put(res)print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':q=Queue()#生產(chǎn)者們:即廚師們p1=Process(target=producer,args=(q,))#消費(fèi)者們:即吃貨們c1=Process(target=consumer,args=(q,))#開始 p1.start()c1.start()print('主')此時(shí)的問題是主進(jìn)程永遠(yuǎn)不會(huì)結(jié)束,原因是:生產(chǎn)者p在生產(chǎn)完后就結(jié)束了,但是消費(fèi)者c在取空了q之后,則一直處于死循環(huán)中且卡在q.get()這一步。
解決方式無非是讓生產(chǎn)者在生產(chǎn)完畢后,往隊(duì)列中再發(fā)一個(gè)結(jié)束信號(hào),這樣消費(fèi)者在接收到結(jié)束信號(hào)后就可以break出死循環(huán)
?
生產(chǎn)者在生產(chǎn)完畢后發(fā)送結(jié)束信號(hào)None from multiprocessing import Process,Queue import time,random,os def consumer(q):while True:res=q.get()if res is None:break #收到結(jié)束信號(hào)則結(jié)束time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):for i in range(10):time.sleep(random.randint(1,3))res='包子%s' %iq.put(res)print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))q.put(None) #發(fā)送結(jié)束信號(hào) if __name__ == '__main__':q=Queue()#生產(chǎn)者們:即廚師們p1=Process(target=producer,args=(q,))#消費(fèi)者們:即吃貨們c1=Process(target=consumer,args=(q,))#開始 p1.start()c1.start()print('主')注意:結(jié)束信號(hào)None,不一定要由生產(chǎn)者發(fā),主進(jìn)程里同樣可以發(fā),但主進(jìn)程需要等生產(chǎn)者結(jié)束后才應(yīng)該發(fā)送該信號(hào)
主進(jìn)程在生產(chǎn)者生產(chǎn)完畢后發(fā)送結(jié)束信號(hào)None from multiprocessing import Process,Queue import time,random,os def consumer(q):while True:res=q.get()if res is None:break #收到結(jié)束信號(hào)則結(jié)束time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):for i in range(2):time.sleep(random.randint(1,3))res='包子%s' %iq.put(res)print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':q=Queue()#生產(chǎn)者們:即廚師們p1=Process(target=producer,args=(q,))#消費(fèi)者們:即吃貨們c1=Process(target=consumer,args=(q,))#開始 p1.start()c1.start()p1.join()q.put(None) #發(fā)送結(jié)束信號(hào)print('主')但上述解決方式,在有多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者時(shí),我們則需要用一個(gè)很low的方式去解決
有幾個(gè)生產(chǎn)者就需要發(fā)送幾次結(jié)束信號(hào):相當(dāng)low from multiprocessing import Process,Queue import time,random,os def consumer(q):while True:res=q.get()if res is None:break #收到結(jié)束信號(hào)則結(jié)束time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(name,q):for i in range(2):time.sleep(random.randint(1,3))res='%s%s' %(name,i)q.put(res)print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':q=Queue()#生產(chǎn)者們:即廚師們p1=Process(target=producer,args=('包子',q))p2=Process(target=producer,args=('骨頭',q))p3=Process(target=producer,args=('泔水',q))#消費(fèi)者們:即吃貨們c1=Process(target=consumer,args=(q,))c2=Process(target=consumer,args=(q,))#開始 p1.start()p2.start()p3.start()c1.start()p1.join() #必須保證生產(chǎn)者全部生產(chǎn)完畢,才應(yīng)該發(fā)送結(jié)束信號(hào) p2.join()p3.join()q.put(None) #有幾個(gè)生產(chǎn)者就應(yīng)該發(fā)送幾次結(jié)束信號(hào)Noneq.put(None) #發(fā)送結(jié)束信號(hào)q.put(None) #發(fā)送結(jié)束信號(hào)print('主')?
其實(shí)我們的思路無非是發(fā)送結(jié)束信號(hào)而已,有另外一種隊(duì)列提供了這種機(jī)制
#JoinableQueue([maxsize]):這就像是一個(gè)Queue對(duì)象,但隊(duì)列允許項(xiàng)目的使用者通知生成者項(xiàng)目已經(jīng)被成功處理。通知進(jìn)程是使用共享的信號(hào)和條件變量來實(shí)現(xiàn)的。#參數(shù)介紹: maxsize是隊(duì)列中允許最大項(xiàng)數(shù),省略則無大小限制。 #方法介紹: JoinableQueue的實(shí)例p除了與Queue對(duì)象相同的方法之外還具有:q.task_done():使用者使用此方法發(fā)出信號(hào),表示q.get()的返回項(xiàng)目已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊(duì)列中刪除項(xiàng)目的數(shù)量,將引發(fā)ValueError異常q.join():生產(chǎn)者調(diào)用此方法進(jìn)行阻塞,直到隊(duì)列中所有的項(xiàng)目均被處理。阻塞將持續(xù)到隊(duì)列中的每個(gè)項(xiàng)目均調(diào)用q.task_done()方法為止 from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q):while True:res=q.get()time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))q.task_done() #向q.join()發(fā)送一次信號(hào),證明一個(gè)數(shù)據(jù)已經(jīng)被取走了def producer(name,q):for i in range(10):time.sleep(random.randint(1,3))res='%s%s' %(name,i)q.put(res)print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))q.join()if __name__ == '__main__':q=JoinableQueue()#生產(chǎn)者們:即廚師們p1=Process(target=producer,args=('包子',q))p2=Process(target=producer,args=('骨頭',q))p3=Process(target=producer,args=('泔水',q))#消費(fèi)者們:即吃貨們c1=Process(target=consumer,args=(q,))c2=Process(target=consumer,args=(q,))c1.daemon=Truec2.daemon=True#開始p_l=[p1,p2,p3,c1,c2]for p in p_l:p.start()p1.join()p2.join()p3.join()print('主') #主進(jìn)程等--->p1,p2,p3等---->c1,c2#p1,p2,p3結(jié)束了,證明c1,c2肯定全都收完了p1,p2,p3發(fā)到隊(duì)列的數(shù)據(jù)#因而c1,c2也沒有存在的價(jià)值了,應(yīng)該隨著主進(jìn)程的結(jié)束而結(jié)束,所以設(shè)置成守護(hù)進(jìn)程七 管道
進(jìn)程間通信(IPC)方式二:管道(不推薦使用,了解即可)
?
#創(chuàng)建管道的類: Pipe([duplex]):在進(jìn)程之間創(chuàng)建一條管道,并返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對(duì)象,強(qiáng)調(diào)一點(diǎn):必須在產(chǎn)生Process對(duì)象之前產(chǎn)生管道 #參數(shù)介紹: dumplex:默認(rèn)管道是全雙工的,如果將duplex射成False,conn1只能用于接收,conn2只能用于發(fā)送。 #主要方法: conn1.recv():接收conn2.send(obj)發(fā)送的對(duì)象。如果沒有消息可接收,recv方法會(huì)一直阻塞。如果連接的另外一端已經(jīng)關(guān)閉,那么recv方法會(huì)拋出EOFError。conn1.send(obj):通過連接發(fā)送對(duì)象。obj是與序列化兼容的任意對(duì)象#其他方法: conn1.close():關(guān)閉連接。如果conn1被垃圾回收,將自動(dòng)調(diào)用此方法 conn1.fileno():返回連接使用的整數(shù)文件描述符 conn1.poll([timeout]):如果連接上的數(shù)據(jù)可用,返回True。timeout指定等待的最長(zhǎng)時(shí)限。如果省略此參數(shù),方法將立即返回結(jié)果。如果將timeout射成None,操作將無限期地等待數(shù)據(jù)到達(dá)。conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發(fā)送的一條完整的字節(jié)消息。maxlength指定要接收的最大字節(jié)數(shù)。如果進(jìn)入的消息,超過了這個(gè)最大值,將引發(fā)IOError異常,并且在連接上無法進(jìn)行進(jìn)一步讀取。如果連接的另外一端已經(jīng)關(guān)閉,再也不存在任何數(shù)據(jù),將引發(fā)EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):通過連接發(fā)送字節(jié)數(shù)據(jù)緩沖區(qū),buffer是支持緩沖區(qū)接口的任意對(duì)象,offset是緩沖區(qū)中的字節(jié)偏移量,而size是要發(fā)送字節(jié)數(shù)。結(jié)果數(shù)據(jù)以單條消息的形式發(fā)出,然后調(diào)用c.recv_bytes()函數(shù)進(jìn)行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節(jié)消息,并把它保存在buffer對(duì)象中,該對(duì)象支持可寫入的緩沖區(qū)接口(即bytearray對(duì)象或類似的對(duì)象)。offset指定緩沖區(qū)中放置消息處的字節(jié)位移。返回值是收到的字節(jié)數(shù)。如果消息長(zhǎng)度大于可用的緩沖區(qū)空間,將引發(fā)BufferTooShort異常。介紹?
基于管道實(shí)現(xiàn)進(jìn)程間通信(與隊(duì)列的方式是類似的,隊(duì)列就是管道加鎖實(shí)現(xiàn)的) from multiprocessing import Process,Pipeimport time,os def consumer(p,name):left,right=pleft.close()while True:try:baozi=right.recv()print('%s 收到包子:%s' %(name,baozi))except EOFError:right.close()break def producer(seq,p):left,right=pright.close()for i in seq:left.send(i)# time.sleep(1)else:left.close() if __name__ == '__main__':left,right=Pipe()c1=Process(target=consumer,args=((left,right),'c1'))c1.start()seq=(i for i in range(10))producer(seq,(left,right))right.close()left.close()c1.join()print('主進(jìn)程')注意:生產(chǎn)者和消費(fèi)者都沒有使用管道的某個(gè)端點(diǎn),就應(yīng)該將其關(guān)閉,如在生產(chǎn)者中關(guān)閉管道的右端,在消費(fèi)者中關(guān)閉管道的左端。如果忘記執(zhí)行這些步驟,程序可能再消費(fèi)者中的recv()操作上掛起。管道是由操作系統(tǒng)進(jìn)行引用計(jì)數(shù)的,必須在所有進(jìn)程中關(guān)閉管道后才能生產(chǎn)EOFError異常。因此在生產(chǎn)者中關(guān)閉管道不會(huì)有任何效果,付費(fèi)消費(fèi)者中也關(guān)閉了相同的管道端點(diǎn)。
?
管道可以用于雙向通信,利用通常在客戶端/服務(wù)器中使用的請(qǐng)求/響應(yīng)模型或遠(yuǎn)程過程調(diào)用,就可以使用管道編寫與進(jìn)程交互的程序 from multiprocessing import Process,Pipeimport time,os def adder(p,name):server,client=pclient.close()while True:try:x,y=server.recv()except EOFError:server.close()breakres=x+yserver.send(res)print('server done') if __name__ == '__main__':server,client=Pipe()c1=Process(target=adder,args=((server,client),'c1'))c1.start()server.close()client.send((10,20))print(client.recv())client.close()c1.join()print('主進(jìn)程') #注意:send()和recv()方法使用pickle模塊對(duì)對(duì)象進(jìn)行序列化。?
八 共享數(shù)據(jù)
展望未來,基于消息傳遞的并發(fā)編程是大勢(shì)所趨
即便是使用線程,推薦做法也是將程序設(shè)計(jì)為大量獨(dú)立的線程集合
通過消息隊(duì)列交換數(shù)據(jù)。這樣極大地減少了對(duì)使用鎖定和其他同步手段的需求,
還可以擴(kuò)展到分布式系統(tǒng)中
進(jìn)程間通信應(yīng)該盡量避免使用本節(jié)所講的共享數(shù)據(jù)的方式
進(jìn)程間數(shù)據(jù)是獨(dú)立的,可以借助于隊(duì)列或管道實(shí)現(xiàn)通信,二者都是基于消息傳遞的雖然進(jìn)程間數(shù)據(jù)獨(dú)立,但可以通過Manager實(shí)現(xiàn)數(shù)據(jù)共享,事實(shí)上Manager的功能遠(yuǎn)不止于此A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example, #進(jìn)程之間操作共享的數(shù)據(jù)from multiprocessing import Manager,Process,Lock import os def work(d,lock):# with lock: #不加鎖而操作共享的數(shù)據(jù),肯定會(huì)出現(xiàn)數(shù)據(jù)錯(cuò)亂d['count']-=1if __name__ == '__main__':lock=Lock()with Manager() as m:dic=m.dict({'count':100})p_l=[]for i in range(100):p=Process(target=work,args=(dic,lock))p_l.append(p)p.start()for p in p_l:p.join()print(dic)#{'count': 94}
?
九 信號(hào)量(了解)
互斥鎖 同時(shí)只允許一個(gè)線程更改數(shù)據(jù),而Semaphore是同時(shí)允許一定數(shù)量的線程更改數(shù)據(jù) ,比如廁所有3個(gè)坑,那最多只允許3個(gè)人上廁所,后面的人只能等里面有人出來了才能再進(jìn)去,如果指定信號(hào)量為3,那么來一個(gè)人獲得一把鎖,計(jì)數(shù)加1,當(dāng)計(jì)數(shù)等于3時(shí),后面的人均需要等待。一旦釋放,就有人可以獲得一把鎖信號(hào)量與進(jìn)程池的概念很像,但是要區(qū)分開,信號(hào)量涉及到加鎖的概念from multiprocessing import Process,Semaphore import time,randomdef go_wc(sem,user):sem.acquire()print('%s 占到一個(gè)茅坑' %user)time.sleep(random.randint(0,3)) #模擬每個(gè)人拉屎速度不一樣,0代表有的人蹲下就起來了 sem.release()if __name__ == '__main__':sem=Semaphore(5)p_l=[]for i in range(13):p=Process(target=go_wc,args=(sem,'user%s' %i,))p.start()p_l.append(p)for i in p_l:i.join()print('============》')信號(hào)量Semahpore(同線程一樣)十 事件(了解)
python線程的事件用于主線程控制其他線程的執(zhí)行,事件主要提供了三個(gè)方法 set、wait、clear。事件處理的機(jī)制:全局定義了一個(gè)“Flag”,如果“Flag”值為 False,那么當(dāng)程序執(zhí)行 event.wait 方法時(shí)就會(huì)阻塞,如果“Flag”值為True,那么event.wait 方法時(shí)便不再阻塞。clear:將“Flag”設(shè)置為False set:將“Flag”設(shè)置為True#_*_coding:utf-8_*_ #!/usr/bin/env pythonfrom multiprocessing import Process,Event import time,randomdef car(e,n):while True:if not e.is_set(): #Flaseprint('\033[31m紅燈亮\033[0m,car%s等著' %n)e.wait()print('\033[32m車%s 看見綠燈亮了\033[0m' %n)time.sleep(random.randint(3,6))if not e.is_set():continueprint('走你,car', n)breakdef police_car(e,n):while True:if not e.is_set():print('\033[31m紅燈亮\033[0m,car%s等著' % n)e.wait(1)print('燈的是%s,警車走了,car %s' %(e.is_set(),n))breakdef traffic_lights(e,inverval):while True:time.sleep(inverval)if e.is_set():e.clear() #e.is_set() ---->Falseelse:e.set()if __name__ == '__main__':e=Event()# for i in range(10):# p=Process(target=car,args=(e,i,))# p.start()for i in range(5):p = Process(target=police_car, args=(e, i,))p.start()t=Process(target=traffic_lights,args=(e,10))t.start()print('============》')Event(同線程一樣)?
十一 進(jìn)程池
在利用Python進(jìn)行系統(tǒng)管理的時(shí)候,特別是同時(shí)操作多個(gè)文件目錄,或者遠(yuǎn)程控制多臺(tái)主機(jī),并行操作可以節(jié)約大量的時(shí)間。多進(jìn)程是實(shí)現(xiàn)并發(fā)的手段之一,需要注意的問題是:
例如當(dāng)被操作對(duì)象數(shù)目不大時(shí),可以直接利用multiprocessing中的Process動(dòng)態(tài)成生多個(gè)進(jìn)程,十幾個(gè)還好,但如果是上百個(gè),上千個(gè)。。。手動(dòng)的去限制進(jìn)程數(shù)量卻又太過繁瑣,此時(shí)可以發(fā)揮進(jìn)程池的功效。
我們就可以通過維護(hù)一個(gè)進(jìn)程池來控制進(jìn)程數(shù)目,比如httpd的進(jìn)程模式,規(guī)定最小進(jìn)程數(shù)和最大進(jìn)程數(shù)...?
ps:對(duì)于遠(yuǎn)程過程調(diào)用的高級(jí)應(yīng)用程序而言,應(yīng)該使用進(jìn)程池,Pool可以提供指定數(shù)量的進(jìn)程,供用戶調(diào)用,當(dāng)有新的請(qǐng)求提交到pool中時(shí),如果池還沒有滿,那么就會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來執(zhí)行該請(qǐng)求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請(qǐng)求就會(huì)等待,直到池中有進(jìn)程結(jié)束,就重用進(jìn)程池中的進(jìn)程。
? ??創(chuàng)建進(jìn)程池的類:如果指定numprocess為3,則進(jìn)程池會(huì)從無到有創(chuàng)建三個(gè)進(jìn)程,然后自始至終使用這三個(gè)進(jìn)程去執(zhí)行所有任務(wù),不會(huì)開啟其他進(jìn)程
1 Pool([numprocess [,initializer [, initargs]]]):創(chuàng)建進(jìn)程池?? ??參數(shù)介紹:
1 numprocess:要?jiǎng)?chuàng)建的進(jìn)程數(shù),如果省略,將默認(rèn)使用cpu_count()的值 2 initializer:是每個(gè)工作進(jìn)程啟動(dòng)時(shí)要執(zhí)行的可調(diào)用對(duì)象,默認(rèn)為None 3 initargs:是要傳給initializer的參數(shù)組?方法介紹:
主要方法: p.apply(func [, args [, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。需要強(qiáng)調(diào)的是:此操作并不會(huì)在所有池工作進(jìn)程中并執(zhí)行func函數(shù)。如果要通過不同參數(shù)并發(fā)地執(zhí)行func函數(shù),必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async() p.apply_async(func [, args [, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。此方法的結(jié)果是AsyncResult類的實(shí)例,callback是可調(diào)用對(duì)象,接收輸入?yún)?shù)。當(dāng)func的結(jié)果變?yōu)榭捎脮r(shí),將理解傳遞給callback。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結(jié)果。p.close():關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成 P.jion():等待所有工作進(jìn)程退出。此方法只能在close()或teminate()之后調(diào)用?其他方法(了解部分)
方法apply_async()和map_async()的返回值是AsyncResul的實(shí)例obj。實(shí)例具有以下方法 obj.get():返回結(jié)果,如果有必要?jiǎng)t等待結(jié)果到達(dá)。timeout是可選的。如果在指定時(shí)間內(nèi)還沒有到達(dá),將引發(fā)一場(chǎng)。如果遠(yuǎn)程操作中引發(fā)了異常,它將在調(diào)用此方法時(shí)再次被引發(fā)。 obj.ready():如果調(diào)用完成,返回True obj.successful():如果調(diào)用完成且沒有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常 obj.wait([timeout]):等待結(jié)果變?yōu)榭捎谩?obj.terminate():立即終止所有工作進(jìn)程,同時(shí)不執(zhí)行任何清理或結(jié)束任何掛起工作。如果p被垃圾回收,將自動(dòng)調(diào)用此函數(shù)??應(yīng)用:
apply同步執(zhí)行:阻塞式 from multiprocessing import Pool import os,time def work(n):print('%s run' %os.getpid())time.sleep(3)return n**2if __name__ == '__main__':p=Pool(3) #進(jìn)程池中從無到有創(chuàng)建三個(gè)進(jìn)程,以后一直是這三個(gè)進(jìn)程在執(zhí)行任務(wù)res_l=[]for i in range(10):res=p.apply(work,args=(i,)) #同步運(yùn)行,阻塞、直到本次任務(wù)執(zhí)行完畢拿到res res_l.append(res)print(res_l)apply_async異步執(zhí)行:非阻塞 from multiprocessing import Pool import os,time def work(n):print('%s run' %os.getpid())time.sleep(3)return n**2if __name__ == '__main__':p=Pool(3) #進(jìn)程池中從無到有創(chuàng)建三個(gè)進(jìn)程,以后一直是這三個(gè)進(jìn)程在執(zhí)行任務(wù)res_l=[]for i in range(10):res=p.apply_async(work,args=(i,)) #同步運(yùn)行,阻塞、直到本次任務(wù)執(zhí)行完畢拿到res res_l.append(res)#異步apply_async用法:如果使用異步提交的任務(wù),主進(jìn)程需要使用jion,等待進(jìn)程池內(nèi)任務(wù)都處理完,然后可以用get收集結(jié)果,否則,主進(jìn)程結(jié)束,進(jìn)程池可能還沒來得及執(zhí)行,也就跟著一起結(jié)束了 p.close()p.join()for res in res_l:print(res.get()) #使用get來獲取apply_aync的結(jié)果,如果是apply,則沒有g(shù)et方法,因?yàn)閍pply是同步執(zhí)行,立刻獲取結(jié)果,也根本無需get
?
詳解:apply_async與apply
#一:使用進(jìn)程池(非阻塞,apply_async) #coding: utf-8 from multiprocessing import Process,Pool import timedef func(msg):print( "msg:", msg)time.sleep(1)return msgif __name__ == "__main__":pool = Pool(processes = 3)res_l=[]for i in range(10):msg = "hello %d" %(i)res=pool.apply_async(func, (msg, )) #維持執(zhí)行的進(jìn)程總數(shù)為processes,當(dāng)一個(gè)進(jìn)程執(zhí)行完畢后會(huì)添加新的進(jìn)程進(jìn)去 res_l.append(res)print("==============================>") #沒有后面的join,或get,則程序整體結(jié)束,進(jìn)程池中的任務(wù)還沒來得及全部執(zhí)行完也都跟著主進(jìn)程一起結(jié)束了 pool.close() #關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成pool.join() #調(diào)用join之前,先調(diào)用close函數(shù),否則會(huì)出錯(cuò)。執(zhí)行完close后不會(huì)有新的進(jìn)程加入到pool,join函數(shù)等待所有子進(jìn)程結(jié)束print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對(duì)象組成的列表,而非最終的結(jié)果,但這一步是在join后執(zhí)行的,證明結(jié)果已經(jīng)計(jì)算完畢,剩下的事情就是調(diào)用每個(gè)對(duì)象下的get方法去獲取結(jié)果for i in res_l:print(i.get()) #使用get來獲取apply_aync的結(jié)果,如果是apply,則沒有g(shù)et方法,因?yàn)閍pply是同步執(zhí)行,立刻獲取結(jié)果,也根本無需get#二:使用進(jìn)程池(阻塞,apply) #coding: utf-8 from multiprocessing import Process,Pool import timedef func(msg):print( "msg:", msg)time.sleep(0.1)return msgif __name__ == "__main__":pool = Pool(processes = 3)res_l=[]for i in range(10):msg = "hello %d" %(i)res=pool.apply(func, (msg, )) #維持執(zhí)行的進(jìn)程總數(shù)為processes,當(dāng)一個(gè)進(jìn)程執(zhí)行完畢后會(huì)添加新的進(jìn)程進(jìn)去res_l.append(res) #同步執(zhí)行,即執(zhí)行完一個(gè)拿到結(jié)果,再去執(zhí)行另外一個(gè)print("==============================>")pool.close()pool.join() #調(diào)用join之前,先調(diào)用close函數(shù),否則會(huì)出錯(cuò)。執(zhí)行完close后不會(huì)有新的進(jìn)程加入到pool,join函數(shù)等待所有子進(jìn)程結(jié)束print(res_l) #看到的就是最終的結(jié)果組成的列表for i in res_l: #apply是同步的,所以直接得到結(jié)果,沒有g(shù)et()方法print(i)練習(xí)2:使用進(jìn)程池維護(hù)固定數(shù)目的進(jìn)程(重寫練習(xí)1)
server端 #Pool內(nèi)的進(jìn)程數(shù)默認(rèn)是cpu核數(shù),假設(shè)為4(查看方法os.cpu_count()) #開啟6個(gè)客戶端,會(huì)發(fā)現(xiàn)2個(gè)客戶端處于等待狀態(tài) #在每個(gè)進(jìn)程內(nèi)查看pid,會(huì)發(fā)現(xiàn)pid使用為4個(gè),即多個(gè)客戶端公用4個(gè)進(jìn)程 from socket import * from multiprocessing import Pool import osserver=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5)def talk(conn,client_addr):print('進(jìn)程pid: %s' %os.getpid())while True:try:msg=conn.recv(1024)if not msg:breakconn.send(msg.upper())except Exception:breakif __name__ == '__main__':p=Pool()while True:conn,client_addr=server.accept()p.apply_async(talk,args=(conn,client_addr))# p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時(shí)間只有一個(gè)客戶端能訪問?
客戶端
?
發(fā)現(xiàn):并發(fā)開啟多個(gè)客戶端,服務(wù)端同一時(shí)間只有3個(gè)不同的pid,干掉一個(gè)客戶端,另外一個(gè)客戶端才會(huì)進(jìn)來,被3個(gè)進(jìn)程之一處理
?
回掉函數(shù):
需要回調(diào)函數(shù)的場(chǎng)景:進(jìn)程池中任何一個(gè)任務(wù)一旦處理完了,就立即告知主進(jìn)程:我好了額,你可以處理我的結(jié)果了。主進(jìn)程則調(diào)用一個(gè)函數(shù)去處理該結(jié)果,該函數(shù)即回調(diào)函數(shù)
我們可以把耗時(shí)間(阻塞)的任務(wù)放到進(jìn)程池中,然后指定回調(diào)函數(shù)(主進(jìn)程負(fù)責(zé)執(zhí)行),這樣主進(jìn)程在執(zhí)行回調(diào)函數(shù)時(shí)就省去了I/O的過程,直接拿到的是任務(wù)的結(jié)果。
from multiprocessing import Pool import requests import json import osdef get_page(url):print('<進(jìn)程%s> get %s' %(os.getpid(),url))respone=requests.get(url)if respone.status_code == 200:return {'url':url,'text':respone.text}def pasrse_page(res):print('<進(jìn)程%s> parse %s' %(os.getpid(),res['url']))parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))with open('db.txt','a') as f:f.write(parse_res)if __name__ == '__main__':urls=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']p=Pool(3)res_l=[]for url in urls:res=p.apply_async(get_page,args=(url,),callback=pasrse_page)res_l.append(res)p.close()p.join()print([res.get() for res in res_l]) #拿到的是get_page的結(jié)果,其實(shí)完全沒必要拿該結(jié)果,該結(jié)果已經(jīng)傳給回調(diào)函數(shù)處理了''' 打印結(jié)果: <進(jìn)程3388> get https://www.baidu.com <進(jìn)程3389> get https://www.python.org <進(jìn)程3390> get https://www.openstack.org <進(jìn)程3388> get https://help.github.com/ <進(jìn)程3387> parse https://www.baidu.com <進(jìn)程3389> get http://www.sina.com.cn/ <進(jìn)程3387> parse https://www.python.org <進(jìn)程3387> parse https://help.github.com/ <進(jìn)程3387> parse http://www.sina.com.cn/ <進(jìn)程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}] '''爬蟲案例
from multiprocessing import Pool import time,random import requests import redef get_page(url,pattern):response=requests.get(url)if response.status_code == 200:return (response.text,pattern)def parse_page(info):page_content,pattern=infores=re.findall(pattern,page_content)for item in res:dic={'index':item[0],'title':item[1],'actor':item[2].strip()[3:],'time':item[3][5:],'score':item[4]+item[5]}print(dic) if __name__ == '__main__':pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)url_dic={'http://maoyan.com/board/7':pattern1,}p=Pool()res_l=[]for url,pattern in url_dic.items():res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)res_l.append(res)for i in res_l:i.get()# res=requests.get('http://maoyan.com/board/7')# print(re.findall(pattern,res.text))如果在主進(jìn)程中等待進(jìn)程池中所有任務(wù)都執(zhí)行完畢后,再統(tǒng)一處理結(jié)果,則無需回調(diào)函數(shù)
from multiprocessing import Pool import time,random,osdef work(n):time.sleep(1)return n**2 if __name__ == '__main__':p=Pool()res_l=[]for i in range(10):res=p.apply_async(work,args=(i,))res_l.append(res)p.close()p.join() #等待進(jìn)程池中所有進(jìn)程執(zhí)行完畢 nums=[]for res in res_l:nums.append(res.get()) #拿到所有結(jié)果print(nums) #主進(jìn)程拿到所有的處理結(jié)果,可以在主進(jìn)程中進(jìn)行統(tǒng)一進(jìn)行處理進(jìn)程池的其他實(shí)現(xiàn)方式:https://docs.python.org/dev/library/concurrent.futures.html
?
轉(zhuǎn)載于:https://www.cnblogs.com/ctztake/p/7445428.html
總結(jié)
以上是生活随笔為你收集整理的Cpython解释器支持的进程与线程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: php fopen插入文本_PHP 文件
- 下一篇: Python requests介绍之接口