日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

进程池Pool

發(fā)布時間:2023/12/20 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 进程池Pool 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Pool

在利用Python進行系統(tǒng)管理的時候,特別是同時操作多個文件目錄,或者遠程控制多臺主機,并行操作可以節(jié)約大量的時間。當(dāng)被操作對象數(shù)目不大時,可以直接利用multiprocessing中的Process動態(tài)成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數(shù)量卻又太過繁瑣,此時可以發(fā)揮進程池的功效。
Pool可以提供指定數(shù)量的進程,供用戶調(diào)用,當(dāng)有新的請求提交到pool中時,如果池還沒有滿,那么就會創(chuàng)建一個新的進程用來執(zhí)行該請求;但如果池中的進程數(shù)已經(jīng)達到規(guī)定最大值,那么該請求就會等待,直到池中有進程結(jié)束,才會創(chuàng)建新的進程來它。

?

例7.1:使用進程池(非阻塞)

#coding: utf-8 import multiprocessing import timedef func(msg):print "msg:", msgtime.sleep(3)print "end"if __name__ == "__main__":pool = multiprocessing.Pool(processes = 3)for i in xrange(4):msg = "hello %d" %(i)pool.apply_async(func, (msg, )) #維持執(zhí)行的進程總數(shù)為processes,當(dāng)一個進程執(zhí)行完畢后會添加新的進程進去print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"pool.close()pool.join() #調(diào)用join之前,先調(diào)用close函數(shù),否則會出錯。執(zhí)行完close后不會有新的進程加入到pool,join函數(shù)等待所有子進程結(jié)束print "Sub-process(es) done."

一次執(zhí)行結(jié)果

1

2

3

4

5

6

7

8

9

10

mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello?0

?

msg: hello?1

msg: hello?2

end

msg: hello?3

end

end

end

Sub-process(es) done.

函數(shù)解釋:

apply_async(func[, args[, kwds[, callback]]])?
它是非阻塞,
apply(func[, args[, kwds]])是阻塞的
close()? ? 關(guān)閉pool,使其不在接受新的任務(wù)。
terminate()? ? 結(jié)束工作進程,不在處理未完成的任務(wù)。
join()? ? 主進程阻塞,等待子進程的退出,?
join方法要在close或terminate之后使用。
執(zhí)行說明:創(chuàng)建一個進程池pool,并設(shè)定進程的數(shù)量為3,xrange(4)會相繼產(chǎn)生四個對象[0, 1, 2, 4],四個對象被提交到pool中,因pool指定進程數(shù)為3,所以0、1、2會直接送到進程中執(zhí)行,當(dāng)其中一個執(zhí)行完事后才空出一個進程處理對象3,所以會出現(xiàn)輸出“msg: hello 3”出現(xiàn)在"end"后。因為為非阻塞,主函數(shù)會自己執(zhí)行自個的,不搭理進程的執(zhí)行,所以運行完for循環(huán)后直接輸出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()處等待各個進程的結(jié)束。



執(zhí)行說明:創(chuàng)建一個進程池pool,并設(shè)定進程的數(shù)量為3,xrange(4)會相繼產(chǎn)生四個對象[0, 1, 2, 4],
四個對象被提交到pool中,因pool指定進程數(shù)為3,
所以0、1、2會直接送到進程中執(zhí)行,當(dāng)其中一個執(zhí)行完事后才空出一個進程處理對象3.


所以會出現(xiàn)輸出“msg: hello 3”出現(xiàn)在"end"后。
因為為非阻塞,主函數(shù)會自己執(zhí)行自個的,
不搭理進程的執(zhí)行,所以運行完for循環(huán)后直接輸出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,(也就是說進程都沒執(zhí)行完,主進程就開始做自己的事情了)
主程序在pool.join()處等待各個進程的結(jié)束。

?

例7.2:使用進程池(阻塞)

#coding: utf-8 import multiprocessing import timedef func(msg):print "msg:", msgtime.sleep(3)print "end"if __name__ == "__main__":pool = multiprocessing.Pool(processes = 3)for i in xrange(4):msg = "hello %d" %(i)pool.apply(func, (msg, )) #維持執(zhí)行的進程總數(shù)為processes,當(dāng)一個進程執(zhí)行完畢后會添加新的進程進去print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"pool.close()pool.join() #調(diào)用join之前,先調(diào)用close函數(shù),否則會出錯。執(zhí)行完close后不會有新的進程加入到pool,join函數(shù)等待所有子進程結(jié)束print "Sub-process(es) done."

一次執(zhí)行的結(jié)果

1

2

3

4

5

6

7

8

9

10

msg: hello?0

end

msg: hello?1

end

msg: hello?2

end

msg: hello?3

end

Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~

Sub-process(es) done.

  

例7.3:使用進程池,并關(guān)注結(jié)果

import multiprocessing import timedef func(msg):print "msg:", msgtime.sleep(3)print "end"return "done" + msgif __name__ == "__main__":pool = multiprocessing.Pool(processes=4)result = []for i in xrange(3):msg = "hello %d" %(i)result.append(pool.apply_async(func, (msg, )))pool.close()pool.join()for res in result:print ":::", res.get()print "Sub-process(es) done."

一次執(zhí)行結(jié)果

1

2

3

4

5

6

7

8

9

10

msg: hello?0

msg: hello?1

msg: hello?2

end

end

end

::: donehello?0

::: donehello?1

::: donehello?2

Sub-process(es) done.

?

例7.4:使用多個進程池

#coding: utf-8 import multiprocessing import os, time, randomdef Lee():print "\nRun task Lee-%s" %(os.getpid()) #os.getpid()獲取當(dāng)前的進程的IDstart = time.time()time.sleep(random.random() * 10) #random.random()隨機生成0-1之間的小數(shù)end = time.time()print 'Task Lee, runs %0.2f seconds.' %(end - start)def Marlon():print "\nRun task Marlon-%s" %(os.getpid())start = time.time()time.sleep(random.random() * 40)end=time.time()print 'Task Marlon runs %0.2f seconds.' %(end - start)def Allen():print "\nRun task Allen-%s" %(os.getpid())start = time.time()time.sleep(random.random() * 30)end = time.time()print 'Task Allen runs %0.2f seconds.' %(end - start)def Frank():print "\nRun task Frank-%s" %(os.getpid())start = time.time()time.sleep(random.random() * 20)end = time.time()print 'Task Frank runs %0.2f seconds.' %(end - start)if __name__=='__main__':function_list= [Lee, Marlon, Allen, Frank] print "parent process %s" %(os.getpid())pool=multiprocessing.Pool(4)for func in function_list:pool.apply_async(func) #Pool執(zhí)行函數(shù),apply執(zhí)行函數(shù),當(dāng)有一個進程執(zhí)行完畢后,會添加一個新的進程到pool中print 'Waiting for all subprocesses done...'pool.close()pool.join() #調(diào)用join之前,一定要先調(diào)用close() 函數(shù),否則會出錯, close()執(zhí)行后不會有新的進程加入到pool,join函數(shù)等待素有子進程結(jié)束print 'All subprocesses done.'

一次執(zhí)行結(jié)果

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

parent process?7704

?

Waiting for?all?subprocesses done...

Run task Lee-6948

?

Run task Marlon-2896

?

Run task Allen-7304

?

Run task Frank-3052

Task Lee, runs?1.59?seconds.

Task Marlon runs?8.48?seconds.

Task Frank runs?15.68?seconds.

Task Allen runs?18.08?seconds.

All subprocesses done.

?

?

?

?python自2.6開始提供了多進程模塊multiprocessing,這里主要是介紹multiprocessing下的Pool的幾個函數(shù)
一 apply(func[, args[, kwds]])
apply用于傳遞不定參數(shù),同python中的apply函數(shù)一致(不過內(nèi)置的apply函數(shù)從2.3以后就不建議使用了),主進程會阻塞于函數(shù)。
for x in gen_list(l):
result = pool.apply(pool_test, (x,))
print 'main process'
這個時候主進程的執(zhí)行流程同單進程一致
二 apply_async(func[, args[, kwds[, callback]]])
與apply用法一致,但它是非阻塞的且支持結(jié)果返回后進行回調(diào)。
for x in gen_list(l):
result = pool.apply_async(pool_test, (x,))
print 'main process'
這個時候主進程循環(huán)運行過程中不等待apply_async的返回結(jié)果,在主進程結(jié)束后,即使子進程還未返回整個程序也會就退出。雖然 apply_async是非阻塞的,但其返回結(jié)果的get方法卻是阻塞的,在本例中result.get()會阻塞主進程。因此可以這樣來處理返回結(jié)果:
[x.get() for x in [pool.apply_async(pool_test, (x,)) for x in gen_list(l)]]
如果我們對返回結(jié)果不感興趣, 那么可以在主進程中使用pool.close與pool.join來防止主進程退出。注意join方法一定要在close或terminate之后調(diào)用。
for x in gen_list(l):
pool.apply_async(pool_test, (x, ))
print 'main_process'
pool.close()
pool.join()
三 map(func, iterable[, chunksize])
map方法與內(nèi)置的map函數(shù)行為基本一致,在它會使進程阻塞與此直到結(jié)果返回。
但需注意的是其第二個參數(shù)雖然描述的為iterable, 但在實際使用中發(fā)現(xiàn)只有在整個隊列全部就緒后,程序才會運行子進程。
四 map_async(func, iterable[, chunksize[, callback]])
與map用法一致,但是它是非阻塞的。其有關(guān)事項見apply_async。
五 imap(func, iterable[, chunksize])
與map不同的是, imap的返回結(jié)果為iter,需要在主進程中主動使用next來驅(qū)動子進程的調(diào)用。即使子進程沒有返回結(jié)果,主進程對于gen_list(l)的 iter還是會繼續(xù)進行, 另外根據(jù)python2.6文檔的描述,對于大數(shù)據(jù)量的iterable而言,將chunksize設(shè)置大一些比默認的1要好。
for x in pool.imap(pool_test, gen_list(l)):
pass
六 imap_unordered(func, iterable[, chunksize])
同imap一致,只不過其并不保證返回結(jié)果與迭代傳入的順序一致。
七 close()
關(guān)閉pool,使其不在接受新的任務(wù)。
八 terminate()
結(jié)束工作進程,不在處理未處理的任務(wù)。
九 join()
主進程阻塞等待子進程的退出, join方法要在close或terminate之后使用。
l = range(10)
def gen_list(l):
for x in l:
print 'yield', x
yield x
def pool_test(x):
print 'f2', x
time.sleep(1)

?

總結(jié)

以上是生活随笔為你收集整理的进程池Pool的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。