日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python 线程池_Python线程池及其原理和使用(超级详细)

發(fā)布時(shí)間:2024/1/23 python 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python 线程池_Python线程池及其原理和使用(超级详细) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

系統(tǒng)啟動(dòng)一個(gè)新線程的成本是比較高的,因?yàn)樗婕芭c操作系統(tǒng)的交互。在這種情形下,使用線程池可以很好地提升性能,尤其是當(dāng)程序中需要?jiǎng)?chuàng)建大量生存期很短暫的線程時(shí),更應(yīng)該考慮使用線程池。

線程池在系統(tǒng)啟動(dòng)時(shí)即創(chuàng)建大量空閑的線程,程序只要將一個(gè)函數(shù)提交給線程池,線程池就會(huì)啟動(dòng)一個(gè)空閑的線程來執(zhí)行它。當(dāng)該函數(shù)執(zhí)行結(jié)束后,該線程并不會(huì)死亡,而是再次返回到線程池中變成空閑狀態(tài),等待執(zhí)行下一個(gè)函數(shù)。

此外,使用線程池可以有效地控制系統(tǒng)中并發(fā)線程的數(shù)量。當(dāng)系統(tǒng)中包含有大量的并發(fā)線程時(shí),會(huì)導(dǎo)致系統(tǒng)性能急劇下降,甚至導(dǎo)致 Python 解釋器崩潰,而線程池的最大線程數(shù)參數(shù)可以控制系統(tǒng)中并發(fā)線程的數(shù)量不超過此數(shù)。

線程池的使用

線程池的基類是 concurrent.futures 模塊中的 Executor,Executor 提供了兩個(gè)子類,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于創(chuàng)建線程池,而 ProcessPoolExecutor 用于創(chuàng)建進(jìn)程池。

如果使用線程池/進(jìn)程池來管理并發(fā)編程,那么只要將相應(yīng)的 task 函數(shù)提交給線程池/進(jìn)程池,剩下的事情就由線程池/進(jìn)程池來搞定。

Exectuor 提供了如下常用方法:

submit(fn, *args, **kwargs):將 fn 函數(shù)提交給線程池。*args 代表傳給 fn 函數(shù)的參數(shù),*kwargs 代表以關(guān)鍵字參數(shù)的形式為 fn 函數(shù)傳入?yún)?shù)。

map(func, *iterables, timeout=None, chunksize=1):該函數(shù)類似于全局函數(shù) map(func, *iterables),只是該函數(shù)將會(huì)啟動(dòng)多個(gè)線程,以異步方式立即對(duì) iterables 執(zhí)行 map 處理。

shutdown(wait=True):關(guān)閉線程池。

程序?qū)?task 函數(shù)提交(submit)給線程池后,submit 方法會(huì)返回一個(gè) Future 對(duì)象,Future 類主要用于獲取線程任務(wù)函數(shù)的返回值。由于線程任務(wù)會(huì)在新線程中以異步方式執(zhí)行,因此,線程執(zhí)行的函數(shù)相當(dāng)于一個(gè)“將來完成”的任務(wù),所以 Python 使用 Future 來代表。

Future 提供了如下方法:

cancel():取消該 Future 代表的線程任務(wù)。如果該任務(wù)正在執(zhí)行,不可取消,則該方法返回 False;否則,程序會(huì)取消該任務(wù),并返回 True。

cancelled():返回 Future 代表的線程任務(wù)是否被成功取消。

running():如果該 Future 代表的線程任務(wù)正在執(zhí)行、不可被取消,該方法返回 True。

done():如果該 Funture 代表的線程任務(wù)被成功取消或執(zhí)行完成,則該方法返回 True。

result(timeout=None):獲取該 Future 代表的線程任務(wù)最后返回的結(jié)果。如果 Future 代表的線程任務(wù)還未完成,該方法將會(huì)阻塞當(dāng)前線程,其中 timeout 參數(shù)指定最多阻塞多少秒。

exception(timeout=None):獲取該 Future 代表的線程任務(wù)所引發(fā)的異常。如果該任務(wù)成功完成,沒有異常,則該方法返回 None。

add_done_callback(fn):為該 Future 代表的線程任務(wù)注冊(cè)一個(gè)“回調(diào)函數(shù)”,當(dāng)該任務(wù)成功完成時(shí),程序會(huì)自動(dòng)觸發(fā)該 fn 函數(shù)。

在用完一個(gè)線程池后,應(yīng)該調(diào)用該線程池的 shutdown() 方法,該方法將啟動(dòng)線程池的關(guān)閉序列。調(diào)用 shutdown() 方法后的線程池不再接收新任務(wù),但會(huì)將以前所有的已提交任務(wù)執(zhí)行完成。當(dāng)線程池中的所有任務(wù)都執(zhí)行完成后,該線程池中的所有線程都會(huì)死亡。

使用線程池來執(zhí)行線程任務(wù)的步驟如下:

調(diào)用 ThreadPoolExecutor 類的構(gòu)造器創(chuàng)建一個(gè)線程池。

定義一個(gè)普通函數(shù)作為線程任務(wù)。

調(diào)用 ThreadPoolExecutor 對(duì)象的 submit() 方法來提交線程任務(wù)。

當(dāng)不想提交任何任務(wù)時(shí),調(diào)用 ThreadPoolExecutor 對(duì)象的 shutdown() 方法來關(guān)閉線程池。

下面程序示范了如何使用線程池來執(zhí)行線程任務(wù):

1 def test(value1, value2=None):2 print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))3 time.sleep(2)4 return 'finished'

5

6 deftest_result(future):7 print(future.result())8

9 if __name__ == "__main__":10 importnumpy as np11 from concurrent.futures importThreadPoolExecutor12 threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")13 for i in range(0,10):14 future = threadPool.submit(test, i,i+1)15

16 threadPool.shutdown(wait=True)

1 結(jié)果:2

3 test__0 threading is printed 0, 1

4 test__1 threading is printed 1, 2

5 test__2 threading is printed 2, 3

6 test__3 threading is printed 3, 4

7 test__1 threading is printed 4, 5

8 test__0 threading is printed 5, 6

9 test__3 threading is printed 6, 7

獲取執(zhí)行結(jié)果

前面程序調(diào)用了 Future 的 result() 方法來獲取線程任務(wù)的運(yùn)回值,但該方法會(huì)阻塞當(dāng)前主線程,只有等到錢程任務(wù)完成后,result() 方法的阻塞才會(huì)被解除。

如果程序不希望直接調(diào)用 result() 方法阻塞線程,則可通過 Future 的 add_done_callback() 方法來添加回調(diào)函數(shù),該回調(diào)函數(shù)形如 fn(future)。當(dāng)線程任務(wù)完成后,程序會(huì)自動(dòng)觸發(fā)該回調(diào)函數(shù),并將對(duì)應(yīng)的 Future 對(duì)象作為參數(shù)傳給該回調(diào)函數(shù)。

直接調(diào)用result函數(shù)結(jié)果

1 def test(value1, value2=None):2 print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))3 time.sleep(2)4 return 'finished'

5

6 deftest_result(future):7 print(future.result())8

9 if __name__ == "__main__":10 importnumpy as np11 from concurrent.futures importThreadPoolExecutor12 threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")13 for i in range(0,10):14 future = threadPool.submit(test, i,i+1)15 #future.add_done_callback(test_result)

16 print(future.result())17

18 threadPool.shutdown(wait=True)19 print('main finished')

1 結(jié)果:2

3 test__0 threading is printed 0, 1

4 finished5 test__0 threading is printed 1, 2

6 finished7 test__1 threading is printed 2, 3

8 finished

去掉上面注釋部分,調(diào)用future.add_done_callback函數(shù),注釋掉第16行

1 test__0 threading is printed 0, 1

2 test__1 threading is printed 1, 2

3 test__2 threading is printed 2, 3

4 test__3 threading is printed 3, 4

5 finished6 finished7 finished8 test__1 threading is printed 4, 5

9 test__0 threading is printed 5, 6

10 finished

另外,由于線程池實(shí)現(xiàn)了上下文管理協(xié)議(Context Manage Protocol),因此,程序可以使用 with 語句來管理線程池,這樣即可避免手動(dòng)關(guān)閉線程池,如上面的程序所示。

此外,Exectuor 還提供了一個(gè) map(func, *iterables, timeout=None, chunksize=1) 方法,該方法的功能類似于全局函數(shù) map(),區(qū)別在于線程池的 map() 方法會(huì)為 iterables 的每個(gè)元素啟動(dòng)一個(gè)線程,以并發(fā)方式來執(zhí)行 func 函數(shù)。這種方式相當(dāng)于啟動(dòng) len(iterables) 個(gè)線程,井收集每個(gè)線程的執(zhí)行結(jié)果。

例如,如下程序使用 Executor 的 map() 方法來啟動(dòng)線程,并收集線程任務(wù)的返回值:

示例換成多參數(shù)的:

def test(value1, value2=None):print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))#time.sleep(2)

if __name__ == "__main__":importnumpy as npfrom concurrent.futures importThreadPoolExecutor

threadPool= ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")for i in range(0,10):#test(str(i), str(i+1))

threadPool.map(test, [i],[i+1]) # 這是運(yùn)行一次test的參數(shù),眾所周知map可以讓test執(zhí)行多次,即一個(gè)[]代表一個(gè)參數(shù),一個(gè)參數(shù)賦予不同的值即增加[]的長(zhǎng)度如從[1]到[1,2,3]

threadPool.shutdown(wait=True)

上面程序使用 map() 方法來啟動(dòng) 4個(gè)線程(該程序的線程池包含 4 個(gè)線程,如果繼續(xù)使用只包含兩個(gè)線程的線程池,此時(shí)將有一個(gè)任務(wù)處于等待狀態(tài),必須等其中一個(gè)任務(wù)完成,線程空閑出來才會(huì)獲得執(zhí)行的機(jī)會(huì)),map() 方法的返回值將會(huì)收集每個(gè)線程任務(wù)的返回結(jié)果。

通過上面程序可以看出,使用 map() 方法來啟動(dòng)線程,并收集線程的執(zhí)行結(jié)果,不僅具有代碼簡(jiǎn)單的優(yōu)點(diǎn),而且雖然程序會(huì)以并發(fā)方式來執(zhí)行 test() 函數(shù),但最后收集的 test() 函數(shù)的執(zhí)行結(jié)果,依然與傳入?yún)?shù)的結(jié)果保持一致。

編寫這個(gè)文檔主要是因?yàn)槭纠臋n[1]沒有多參數(shù)的。網(wǎng)上很多資料都是基于threadpool方法傳參見[2]

Reference:

[1] http://c.biancheng.net/view/2627.html

[2] https://www.cnblogs.com/gongxijun/p/6862333.html

總結(jié)

以上是生活随笔為你收集整理的python 线程池_Python线程池及其原理和使用(超级详细)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。