python 线程池_Python线程池及其原理和使用(超级详细)
系統(tǒng)啟動(dòng)一個(gè)新線程的成本是比較高的,因?yàn)樗婕芭c操作系統(tǒng)的交互。在這種情形下,使用線程池可以很好地提升性能,尤其是當(dāng)程序中需要?jiǎng)?chuàng)建大量生存期很短暫的線程時(shí),更應(yīng)該考慮使用線程池。
線程池在系統(tǒng)啟動(dòng)時(shí)即創(chuàng)建大量空閑的線程,程序只要將一個(gè)函數(shù)提交給線程池,線程池就會(huì)啟動(dòng)一個(gè)空閑的線程來執(zhí)行它。當(dāng)該函數(shù)執(zhí)行結(jié)束后,該線程并不會(huì)死亡,而是再次返回到線程池中變成空閑狀態(tài),等待執(zhí)行下一個(gè)函數(shù)。
此外,使用線程池可以有效地控制系統(tǒng)中并發(fā)線程的數(shù)量。當(dāng)系統(tǒng)中包含有大量的并發(fā)線程時(shí),會(huì)導(dǎo)致系統(tǒng)性能急劇下降,甚至導(dǎo)致 Python 解釋器崩潰,而線程池的最大線程數(shù)參數(shù)可以控制系統(tǒng)中并發(fā)線程的數(shù)量不超過此數(shù)。
線程池的使用
線程池的基類是 concurrent.futures 模塊中的 Executor,Executor 提供了兩個(gè)子類,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于創(chuàng)建線程池,而 ProcessPoolExecutor 用于創(chuàng)建進(jìn)程池。
如果使用線程池/進(jìn)程池來管理并發(fā)編程,那么只要將相應(yīng)的 task 函數(shù)提交給線程池/進(jìn)程池,剩下的事情就由線程池/進(jìn)程池來搞定。
Exectuor 提供了如下常用方法:
submit(fn, *args, **kwargs):將 fn 函數(shù)提交給線程池。*args 代表傳給 fn 函數(shù)的參數(shù),*kwargs 代表以關(guān)鍵字參數(shù)的形式為 fn 函數(shù)傳入?yún)?shù)。
map(func, *iterables, timeout=None, chunksize=1):該函數(shù)類似于全局函數(shù) map(func, *iterables),只是該函數(shù)將會(huì)啟動(dòng)多個(gè)線程,以異步方式立即對(duì) iterables 執(zhí)行 map 處理。
shutdown(wait=True):關(guān)閉線程池。
程序?qū)?task 函數(shù)提交(submit)給線程池后,submit 方法會(huì)返回一個(gè) Future 對(duì)象,Future 類主要用于獲取線程任務(wù)函數(shù)的返回值。由于線程任務(wù)會(huì)在新線程中以異步方式執(zhí)行,因此,線程執(zhí)行的函數(shù)相當(dāng)于一個(gè)“將來完成”的任務(wù),所以 Python 使用 Future 來代表。
Future 提供了如下方法:
cancel():取消該 Future 代表的線程任務(wù)。如果該任務(wù)正在執(zhí)行,不可取消,則該方法返回 False;否則,程序會(huì)取消該任務(wù),并返回 True。
cancelled():返回 Future 代表的線程任務(wù)是否被成功取消。
running():如果該 Future 代表的線程任務(wù)正在執(zhí)行、不可被取消,該方法返回 True。
done():如果該 Funture 代表的線程任務(wù)被成功取消或執(zhí)行完成,則該方法返回 True。
result(timeout=None):獲取該 Future 代表的線程任務(wù)最后返回的結(jié)果。如果 Future 代表的線程任務(wù)還未完成,該方法將會(huì)阻塞當(dāng)前線程,其中 timeout 參數(shù)指定最多阻塞多少秒。
exception(timeout=None):獲取該 Future 代表的線程任務(wù)所引發(fā)的異常。如果該任務(wù)成功完成,沒有異常,則該方法返回 None。
add_done_callback(fn):為該 Future 代表的線程任務(wù)注冊(cè)一個(gè)“回調(diào)函數(shù)”,當(dāng)該任務(wù)成功完成時(shí),程序會(huì)自動(dòng)觸發(fā)該 fn 函數(shù)。
在用完一個(gè)線程池后,應(yīng)該調(diào)用該線程池的 shutdown() 方法,該方法將啟動(dòng)線程池的關(guān)閉序列。調(diào)用 shutdown() 方法后的線程池不再接收新任務(wù),但會(huì)將以前所有的已提交任務(wù)執(zhí)行完成。當(dāng)線程池中的所有任務(wù)都執(zhí)行完成后,該線程池中的所有線程都會(huì)死亡。
使用線程池來執(zhí)行線程任務(wù)的步驟如下:
調(diào)用 ThreadPoolExecutor 類的構(gòu)造器創(chuàng)建一個(gè)線程池。
定義一個(gè)普通函數(shù)作為線程任務(wù)。
調(diào)用 ThreadPoolExecutor 對(duì)象的 submit() 方法來提交線程任務(wù)。
當(dāng)不想提交任何任務(wù)時(shí),調(diào)用 ThreadPoolExecutor 對(duì)象的 shutdown() 方法來關(guān)閉線程池。
下面程序示范了如何使用線程池來執(zhí)行線程任務(wù):
1 def test(value1, value2=None):2 print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))3 time.sleep(2)4 return 'finished'
5
6 deftest_result(future):7 print(future.result())8
9 if __name__ == "__main__":10 importnumpy as np11 from concurrent.futures importThreadPoolExecutor12 threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")13 for i in range(0,10):14 future = threadPool.submit(test, i,i+1)15
16 threadPool.shutdown(wait=True)
1 結(jié)果:2
3 test__0 threading is printed 0, 1
4 test__1 threading is printed 1, 2
5 test__2 threading is printed 2, 3
6 test__3 threading is printed 3, 4
7 test__1 threading is printed 4, 5
8 test__0 threading is printed 5, 6
9 test__3 threading is printed 6, 7
獲取執(zhí)行結(jié)果
前面程序調(diào)用了 Future 的 result() 方法來獲取線程任務(wù)的運(yùn)回值,但該方法會(huì)阻塞當(dāng)前主線程,只有等到錢程任務(wù)完成后,result() 方法的阻塞才會(huì)被解除。
如果程序不希望直接調(diào)用 result() 方法阻塞線程,則可通過 Future 的 add_done_callback() 方法來添加回調(diào)函數(shù),該回調(diào)函數(shù)形如 fn(future)。當(dāng)線程任務(wù)完成后,程序會(huì)自動(dòng)觸發(fā)該回調(diào)函數(shù),并將對(duì)應(yīng)的 Future 對(duì)象作為參數(shù)傳給該回調(diào)函數(shù)。
直接調(diào)用result函數(shù)結(jié)果
1 def test(value1, value2=None):2 print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))3 time.sleep(2)4 return 'finished'
5
6 deftest_result(future):7 print(future.result())8
9 if __name__ == "__main__":10 importnumpy as np11 from concurrent.futures importThreadPoolExecutor12 threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")13 for i in range(0,10):14 future = threadPool.submit(test, i,i+1)15 #future.add_done_callback(test_result)
16 print(future.result())17
18 threadPool.shutdown(wait=True)19 print('main finished')
1 結(jié)果:2
3 test__0 threading is printed 0, 1
4 finished5 test__0 threading is printed 1, 2
6 finished7 test__1 threading is printed 2, 3
8 finished
去掉上面注釋部分,調(diào)用future.add_done_callback函數(shù),注釋掉第16行
1 test__0 threading is printed 0, 1
2 test__1 threading is printed 1, 2
3 test__2 threading is printed 2, 3
4 test__3 threading is printed 3, 4
5 finished6 finished7 finished8 test__1 threading is printed 4, 5
9 test__0 threading is printed 5, 6
10 finished
另外,由于線程池實(shí)現(xiàn)了上下文管理協(xié)議(Context Manage Protocol),因此,程序可以使用 with 語句來管理線程池,這樣即可避免手動(dòng)關(guān)閉線程池,如上面的程序所示。
此外,Exectuor 還提供了一個(gè) map(func, *iterables, timeout=None, chunksize=1) 方法,該方法的功能類似于全局函數(shù) map(),區(qū)別在于線程池的 map() 方法會(huì)為 iterables 的每個(gè)元素啟動(dòng)一個(gè)線程,以并發(fā)方式來執(zhí)行 func 函數(shù)。這種方式相當(dāng)于啟動(dòng) len(iterables) 個(gè)線程,井收集每個(gè)線程的執(zhí)行結(jié)果。
例如,如下程序使用 Executor 的 map() 方法來啟動(dòng)線程,并收集線程任務(wù)的返回值:
示例換成多參數(shù)的:
def test(value1, value2=None):print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))#time.sleep(2)
if __name__ == "__main__":importnumpy as npfrom concurrent.futures importThreadPoolExecutor
threadPool= ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")for i in range(0,10):#test(str(i), str(i+1))
threadPool.map(test, [i],[i+1]) # 這是運(yùn)行一次test的參數(shù),眾所周知map可以讓test執(zhí)行多次,即一個(gè)[]代表一個(gè)參數(shù),一個(gè)參數(shù)賦予不同的值即增加[]的長(zhǎng)度如從[1]到[1,2,3]
threadPool.shutdown(wait=True)
上面程序使用 map() 方法來啟動(dòng) 4個(gè)線程(該程序的線程池包含 4 個(gè)線程,如果繼續(xù)使用只包含兩個(gè)線程的線程池,此時(shí)將有一個(gè)任務(wù)處于等待狀態(tài),必須等其中一個(gè)任務(wù)完成,線程空閑出來才會(huì)獲得執(zhí)行的機(jī)會(huì)),map() 方法的返回值將會(huì)收集每個(gè)線程任務(wù)的返回結(jié)果。
通過上面程序可以看出,使用 map() 方法來啟動(dòng)線程,并收集線程的執(zhí)行結(jié)果,不僅具有代碼簡(jiǎn)單的優(yōu)點(diǎn),而且雖然程序會(huì)以并發(fā)方式來執(zhí)行 test() 函數(shù),但最后收集的 test() 函數(shù)的執(zhí)行結(jié)果,依然與傳入?yún)?shù)的結(jié)果保持一致。
編寫這個(gè)文檔主要是因?yàn)槭纠臋n[1]沒有多參數(shù)的。網(wǎng)上很多資料都是基于threadpool方法傳參見[2]
Reference:
[1] http://c.biancheng.net/view/2627.html
[2] https://www.cnblogs.com/gongxijun/p/6862333.html
總結(jié)
以上是生活随笔為你收集整理的python 线程池_Python线程池及其原理和使用(超级详细)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 异步通知和同步通知_CCF NOI 20
- 下一篇: jupyter notebook使用op