进程池、线程池、回调函数、协程
閱讀目錄
摘要:
- 進程池與線程池
- 同步調用和異步調用
- 回調函數
- 協程
一、進程池與線程池:
1、池的概念:
不管是線程還是進程,都不能無限制的開下去,總會消耗和占用資源。
也就是說,硬件的承載能力是有限度的,在保證高效率工作的同時應該還需要保證硬件的資源占用情況,所以需要給硬件設置一個上限來減輕硬件的壓力,所以就有了池的概念。
2、進程池與線程池的使用方法:(進程與線程的創建基本相似,所以進程池與線程池的使用過程也基本一樣)
-------------------------------------------------------------------- 注:如果你對python感興趣,我這有個學習Python基地,里面有很多學習資料,感興趣的+Q群:895817687 --------------------------------------------------------------------from concurrent.futures import ProcessPoolExecutor # 導入進程池模塊 from concurrent.futures import ThreadPoolExecutor # 導入線程池模塊 import os import time import random# 下面以進程池為例,線程池只是使用導入模塊不一樣,僅此而已。 def task(name):print('name:[%s]|進程:[%s]正在運行' % (name, os.getpid()))time.sleep(random.randint(1, 3)) # 模擬進程運行耗費時間。# 這一步的必要性:在創建進程時,會將代碼以模塊的方式從頭到尾導入加載執行一遍 # (所以創建線程如果不寫在main里面的話,這個py文件里面的所有代碼都會從頭到尾加載執行一遍 # 就會導致在創建進程的時候產生死循環。) if __name__ == '__main__':pool = ProcessPoolExecutor(4) # 設置線程池的大小,默認等于cpu的核心數。for i in range(10):pool.submit(task, '進程%s' % i) # 異步提交(提交后不等待)pool.shutdown(wait=True) # 關閉進程池入口不再提交,同時等待進程池全部運行完畢。(類似join方法)print('主') # 標識一下主進程的完畢之前的語句運行結果
# 運行過程及結果: name:[進程0]|進程:[4080]正在運行 name:[進程1]|進程:[18336]正在運行 name:[進程2]|進程:[19864]正在運行 name:[進程3]|進程:[25604]正在運行 name:[進程4]|進程:[4080]正在運行 name:[進程5]|進程:[18336]正在運行 name:[進程6]|進程:[4080]正在運行 name:[進程7]|進程:[19864]正在運行 name:[進程8]|進程:[25604]正在運行 name:[進程9]|進程:[18336]正在運行 主二、同步調用、異步調用
同步調用:提交任務,原地等待該任務執行完畢,拿到結果后再執行下一個任務,導致程序串行執行!
from concurrent.futures import ProcessPoolExecutor # 導入進程池模塊 from concurrent.futures import ThreadPoolExecutor # 導入線程池模塊 import os import time import randomdef task(name):print('name:[%s]|進程[%s]正在運行...' % (name, os.getpid()))time.sleep(random.randint(1, 3))return '拿到[%s]|進程%s的結果...' % (name, os.getpid())if __name__ == '__main__':pool = ProcessPoolExecutor(4)result = [] # 創建一個空列表來搜集執行結果for i in range(10):res = pool.submit(task, '進程%s' % i).result() # 使用.result()方法得到每次的結果,同步調用result.append(res)pool.shutdown(wait=True)for j in result:print(j)print('主進程')過程和結果
# 執行結果: name:[進程0]|進程[3376]正在運行... name:[進程1]|進程[27124]正在運行... name:[進程2]|進程[10176]正在運行... name:[進程3]|進程[28636]正在運行... name:[進程4]|進程[3376]正在運行... name:[進程5]|進程[27124]正在運行... name:[進程6]|進程[10176]正在運行... name:[進程7]|進程[28636]正在運行... name:[進程8]|進程[3376]正在運行... name:[進程9]|進程[27124]正在運行... 拿到[進程0]|進程3376的結果... 拿到[進程1]|進程27124的結果... 拿到[進程2]|進程10176的結果... 拿到[進程3]|進程28636的結果... 拿到[進程4]|進程3376的結果... 拿到[進程5]|進程27124的結果... 拿到[進程6]|進程10176的結果... 拿到[進程7]|進程28636的結果... 拿到[進程8]|進程3376的結果... 拿到[進程9]|進程27124的結果... 主進程異步調用:提交任務,不去等結果,繼續執行。
from concurrent.futures import ProcessPoolExecutor import os import random import timedef task(name):time.sleep(random.randint(1, 3))print('name: %s 進程[%s]運行...' % (name, os.getpid()))if __name__ == '__main__':pool = ProcessPoolExecutor(4)for i in range(10):pool.submit(task, '進程%s' % i) # 異步調用,提交后不等待結果,繼續執行代碼pool.shutdown(wait=True)print('主進程')過程和結果
name: 進程3 進程[10016]運行... name: 進程0 進程[12736]運行... name: 進程1 進程[4488]運行... name: 進程2 進程[3920]運行... name: 進程5 進程[12736]運行... name: 進程6 進程[4488]運行... name: 進程4 進程[10016]運行... name: 進程9 進程[4488]運行... name: 進程8 進程[12736]運行... name: 進程7 進程[3920]運行... 主進程三、回調函數:
上面我們在演示異步調用時候,說過提交任務不等待執行結果,繼續往下執行代碼,那么,執行的結果我們怎么得到呢?
可以為進程池和線程池內的每個進程或線程綁定一個函數,該函數在進程或線程的任務執行完畢后自動觸發并接收任務的返回值當做參數,這個函數就是回調函數。
from concurrent.futures import ThreadPoolExecutor import time import random import requestsdef task(url):print('獲取網站[%s]信息' % url)response = requests.get(url) # 下載頁面time.sleep(random.randint(1, 3))return {'url': url, 'content': response.text} # 返回結果:頁面地址和頁面內容futures = [] def back(res):res = res.result() # 取到提交任務的結果(回調函數固定寫法)res = '網站[%s]內容長度:%s' % (res.get('url'), len(res.get('content')))futures.append(res)return futuresif __name__ == '__main__':urls = ['http://www.baidu.com','http://www.dgtle.com/','https://www.bilibili.com/']pool = ThreadPoolExecutor(4)futures = []for i in urls:pool.submit(task, i).add_done_callback(back) # 執行完線程后,使用回調函數pool.shutdown(wait=True)for j in futures:print(j)結果
獲取網站[http://www.baidu.com]信息 獲取網站[http://www.dgtle.com/]信息 獲取網站[https://www.bilibili.com/]信息 網站[http://www.dgtle.com/]內容長度:39360 網站[https://www.bilibili.com/]內容長度:69377 網站[http://www.baidu.com]內容長度:2381四:協程(通過單線程實現并發)
我們知道,多個線程執行任務時候,如果其中一個任務遇到IO,操作系統會有一種來回’切’的機制,來最大效率利用cpu的使用效率,從而實現多線程并發效果
而協程:就是用單線程實現并發,通過軟件代碼手段,在代碼執行過程中遇到IO,自動切換到進程中的另外一個執行的代碼,然后再次遇到IO,繼續切換到另一個
執行的代碼。
過程就是:單進程中任務執行中:遇到IO,代碼層面在單線程中切換代碼執行。從而騙過操作系統,讓操作系統以為這個單線程好像沒經歷過IO,從而達到該
單線程對cpu使用的效率最大化。
實現過程所需模塊:gevent
from gevent import monkey; monkey.patch_all() # 監測代碼中所有IO行為 # gevent模塊不能識別它本身以外的所有的IO行為,但是它內部封裝了一個模塊,能夠幫助我們識別所有的IO行為from gevent import spawn # 從gevent模塊導入spawn,來使用‘切’的方法 import time import randomdef heng(name):print('%s 哼了一下...' % name)time.sleep(random.randint(1, 3))print('%s 哼完了' % name)def ha(name):print('%s 哈了一下' % name)time.sleep(random.randint(1, 3))print('%s 哈完了' % name)start = time.time() # 標記開始時間 s1 = spawn(heng, '王大錘') # 標記并運行heng函數(遇到IO,切) s2 = spawn(ha, '至尊寶') # 標記并運行ha函數(遇到IO,切)s1.join() s2.join() # s1、s2都執行完畢后才繼續執行 print('運行時間:', time.time()-start)結果
王大錘 哼了一下... 至尊寶 哈了一下 王大錘 哼完了 至尊寶 哈完了 運行時間: 2.0049164295196533進程:資源單位
線程:執行單位
協程:單線程下實現并發(能夠在多個任務之間切換和保存狀態來節省IO),這里注意區分操作系統的切換+保存狀態是針對多個線程而言,而我們現在是想在單個線程下自己手動實現操作系統的切換+保存狀態的功能
注意協程這個概念完全是程序員自己想出來的東西,它對于操作系統來說根本不存在。操作系統只知道進程和線程。并且需要注意的是并不是單個線程下實現切換+保存狀態就能提升效率,因為你可能是沒有遇到io也切,那反而會降低效率
再回過頭來想上面的socket服務端實現并發的例子,單個線程服務端在建立連接的時候無法去干通信的活,在干通信的時候也無法去干連接的活。這兩者肯定都會有IO,如果能夠實現通信io了我就去干建連接,建連接io了我就去干通信,那其實我們就可以實現單線程下實現并發
將單個線程的效率提升到最高,多進程下開多線程,多線程下用協程>>> 實現高并發!!!
協程實現服務端客戶端通信
# 服務端: from gevent import monkey;monkey.patch_all() from gevent import spawn import socket n = 0def communicate(conn):while True:try:data = conn.recv(1024)if len(data) == 0:breakprint(data.decode('utf-8'))conn.send(data.upper())except ConnectionResetError:breakconn.close()def server(): # 切點global nserver = socket.socket()server.bind(('127.0.0.1',8080))server.listen(5)while True:conn, addr = server.accept()# n += 1spawn(communicate, conn) # 切點# print(n)if __name__ == '__main__':s1 = spawn(server)s1.join() # 客戶端 from threading import Thread,current_thread import socketdef client():client = socket.socket()client.connect(('127.0.0.1',8080))n = 1while True:data = '%s %s'%(current_thread().name, n)n += 1client.send(data.encode('utf-8'))info = client.recv(1024)print(info)if __name__ == '__main__':for i in range(500): # 多線程模擬多客戶的訪問服務器,進行通信循環。t = Thread(target=client)t.start()#原本服務端需要開啟500個線程才能跟500個客戶端通信,現在只需要一個線程就可以扛住500客戶端
#進程下面開多個線程,線程下面再開多個協程,最大化提升軟件運行效率
總結
以上是生活随笔為你收集整理的进程池、线程池、回调函数、协程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python中的端口协议之基于UDP协议
- 下一篇: 套接字错误搜集