进程池Pool
Pool
在利用Python進行系統(tǒng)管理的時候,特別是同時操作多個文件目錄,或者遠程控制多臺主機,并行操作可以節(jié)約大量的時間。當(dāng)被操作對象數(shù)目不大時,可以直接利用multiprocessing中的Process動態(tài)成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數(shù)量卻又太過繁瑣,此時可以發(fā)揮進程池的功效。
Pool可以提供指定數(shù)量的進程,供用戶調(diào)用,當(dāng)有新的請求提交到pool中時,如果池還沒有滿,那么就會創(chuàng)建一個新的進程用來執(zhí)行該請求;但如果池中的進程數(shù)已經(jīng)達到規(guī)定最大值,那么該請求就會等待,直到池中有進程結(jié)束,才會創(chuàng)建新的進程來它。
?
例7.1:使用進程池(非阻塞)
#coding: utf-8 import multiprocessing import timedef func(msg):print "msg:", msgtime.sleep(3)print "end"if __name__ == "__main__":pool = multiprocessing.Pool(processes = 3)for i in xrange(4):msg = "hello %d" %(i)pool.apply_async(func, (msg, )) #維持執(zhí)行的進程總數(shù)為processes,當(dāng)一個進程執(zhí)行完畢后會添加新的進程進去print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"pool.close()pool.join() #調(diào)用join之前,先調(diào)用close函數(shù),否則會出錯。執(zhí)行完close后不會有新的進程加入到pool,join函數(shù)等待所有子進程結(jié)束print "Sub-process(es) done."一次執(zhí)行結(jié)果
| 1 2 3 4 5 6 7 8 9 10 | mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello?0 ? msg: hello?1 msg: hello?2 end msg: hello?3 end end end Sub-process(es) done. |
函數(shù)解釋:
apply_async(func[, args[, kwds[, callback]]])?
它是非阻塞,
apply(func[, args[, kwds]])是阻塞的
close()? ? 關(guān)閉pool,使其不在接受新的任務(wù)。
terminate()? ? 結(jié)束工作進程,不在處理未完成的任務(wù)。
join()? ? 主進程阻塞,等待子進程的退出,?
join方法要在close或terminate之后使用。
執(zhí)行說明:創(chuàng)建一個進程池pool,并設(shè)定進程的數(shù)量為3,xrange(4)會相繼產(chǎn)生四個對象[0, 1, 2, 4],四個對象被提交到pool中,因pool指定進程數(shù)為3,所以0、1、2會直接送到進程中執(zhí)行,當(dāng)其中一個執(zhí)行完事后才空出一個進程處理對象3,所以會出現(xiàn)輸出“msg: hello 3”出現(xiàn)在"end"后。因為為非阻塞,主函數(shù)會自己執(zhí)行自個的,不搭理進程的執(zhí)行,所以運行完for循環(huán)后直接輸出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()處等待各個進程的結(jié)束。
執(zhí)行說明:創(chuàng)建一個進程池pool,并設(shè)定進程的數(shù)量為3,xrange(4)會相繼產(chǎn)生四個對象[0, 1, 2, 4],
四個對象被提交到pool中,因pool指定進程數(shù)為3,
所以0、1、2會直接送到進程中執(zhí)行,當(dāng)其中一個執(zhí)行完事后才空出一個進程處理對象3.
所以會出現(xiàn)輸出“msg: hello 3”出現(xiàn)在"end"后。
因為為非阻塞,主函數(shù)會自己執(zhí)行自個的,
不搭理進程的執(zhí)行,所以運行完for循環(huán)后直接輸出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,(也就是說進程都沒執(zhí)行完,主進程就開始做自己的事情了)
主程序在pool.join()處等待各個進程的結(jié)束。
?
例7.2:使用進程池(阻塞)
#coding: utf-8 import multiprocessing import timedef func(msg):print "msg:", msgtime.sleep(3)print "end"if __name__ == "__main__":pool = multiprocessing.Pool(processes = 3)for i in xrange(4):msg = "hello %d" %(i)pool.apply(func, (msg, )) #維持執(zhí)行的進程總數(shù)為processes,當(dāng)一個進程執(zhí)行完畢后會添加新的進程進去print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"pool.close()pool.join() #調(diào)用join之前,先調(diào)用close函數(shù),否則會出錯。執(zhí)行完close后不會有新的進程加入到pool,join函數(shù)等待所有子進程結(jié)束print "Sub-process(es) done."一次執(zhí)行的結(jié)果
| 1 2 3 4 5 6 7 8 9 10 | msg: hello?0 end msg: hello?1 end msg: hello?2 end msg: hello?3 end Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ Sub-process(es) done. |
例7.3:使用進程池,并關(guān)注結(jié)果
import multiprocessing import timedef func(msg):print "msg:", msgtime.sleep(3)print "end"return "done" + msgif __name__ == "__main__":pool = multiprocessing.Pool(processes=4)result = []for i in xrange(3):msg = "hello %d" %(i)result.append(pool.apply_async(func, (msg, )))pool.close()pool.join()for res in result:print ":::", res.get()print "Sub-process(es) done."一次執(zhí)行結(jié)果
| 1 2 3 4 5 6 7 8 9 10 | msg: hello?0 msg: hello?1 msg: hello?2 end end end ::: donehello?0 ::: donehello?1 ::: donehello?2 Sub-process(es) done. |
?
例7.4:使用多個進程池
#coding: utf-8 import multiprocessing import os, time, randomdef Lee():print "\nRun task Lee-%s" %(os.getpid()) #os.getpid()獲取當(dāng)前的進程的IDstart = time.time()time.sleep(random.random() * 10) #random.random()隨機生成0-1之間的小數(shù)end = time.time()print 'Task Lee, runs %0.2f seconds.' %(end - start)def Marlon():print "\nRun task Marlon-%s" %(os.getpid())start = time.time()time.sleep(random.random() * 40)end=time.time()print 'Task Marlon runs %0.2f seconds.' %(end - start)def Allen():print "\nRun task Allen-%s" %(os.getpid())start = time.time()time.sleep(random.random() * 30)end = time.time()print 'Task Allen runs %0.2f seconds.' %(end - start)def Frank():print "\nRun task Frank-%s" %(os.getpid())start = time.time()time.sleep(random.random() * 20)end = time.time()print 'Task Frank runs %0.2f seconds.' %(end - start)if __name__=='__main__':function_list= [Lee, Marlon, Allen, Frank] print "parent process %s" %(os.getpid())pool=multiprocessing.Pool(4)for func in function_list:pool.apply_async(func) #Pool執(zhí)行函數(shù),apply執(zhí)行函數(shù),當(dāng)有一個進程執(zhí)行完畢后,會添加一個新的進程到pool中print 'Waiting for all subprocesses done...'pool.close()pool.join() #調(diào)用join之前,一定要先調(diào)用close() 函數(shù),否則會出錯, close()執(zhí)行后不會有新的進程加入到pool,join函數(shù)等待素有子進程結(jié)束print 'All subprocesses done.'一次執(zhí)行結(jié)果
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | parent process?7704 ? Waiting for?all?subprocesses done... Run task Lee-6948 ? Run task Marlon-2896 ? Run task Allen-7304 ? Run task Frank-3052 Task Lee, runs?1.59?seconds. Task Marlon runs?8.48?seconds. Task Frank runs?15.68?seconds. Task Allen runs?18.08?seconds. All subprocesses done. ? ? ? ?python自2.6開始提供了多進程模塊multiprocessing,這里主要是介紹multiprocessing下的Pool的幾個函數(shù) ? |
總結(jié)
- 上一篇: pipe实现单工和半双工模式
- 下一篇: NLTK的图形化语料文本下载器downl