python 进程池阻塞和非阻塞_python 之 并发编程(进程池与线程池、同步异步阻塞非阻塞、线程queue)...
9.11 進程池與線程池
池子使用來限制并發的任務數目,限制我們的計算機在一個自己可承受的范圍內去并發地執行任務
池子內什么時候裝進程:并發的任務屬于計算密集型池子內什么時候裝線程:并發的任務屬于IO密集型
進程池:
from concurrent.futures importProcessPoolExecutor,ThreadPoolExecutorimporttime,os,random
?deftask(x):print('%s 接客' %os.getpid())
time.sleep(random.randint(2,5))return x**2?if __name__ == '__main__': #ProcessPoolExecutor創建并開啟指定數目的進程
p=ProcessPoolExecutor() #默認開啟的進程數是cpu的核數
?for i in range(20):
p.submit(task,i)#一下并行執行四個任務,等其中一個任務執行完后再執行下一個
線程池:
from concurrent.futures importProcessPoolExecutor,ThreadPoolExecutorimporttime,os,random
?deftask(x):print('%s 接客' %x)
time.sleep(random.randint(2,5))return x**2?if __name__ == '__main__': #ThreadPoolExecutor創建并開啟指定數目的線程
p=ThreadPoolExecutor(4) #默認開啟的線程數是cpu的核數*5
?for i in range(20):
p.submit(task,i)#一下并發執行四個任務,等其中一個任務執行完后再并發執行下一個
9.112 基于多線程實現并發的套接字通信(使用線程池)
服務端:
from socket import *
from threading importThreadfrom concurrent.futures importProcessPoolExecutor,ThreadPoolExecutor
?
tpool=ThreadPoolExecutor(3) #ThreadPoolExecutor創建并開啟指定數目的線程
defcommunicate(conn,client_addr):while True: #通訊循環
try:
data= conn.recv(1024)if not data: breakconn.send(data.upper())exceptConnectionResetError:breakconn.close()
?defserver():
server=socket(AF_INET,SOCK_STREAM)
server.bind(('127.0.0.1',8080))
server.listen(5)
?while True: #鏈接循環
conn,client_addr=server.accept()print(client_addr)#t=Thread(target=communicate,args=(conn,client_addr))
#t.start()
tpool.submit(communicate,conn,client_addr)#一下并發執行3個任務,等其中一個任務執行完后再并發執行下一個
server.close()
?if __name__ == '__main__':
server()
View Code
客戶端:
from socket import *client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
?whileTrue:
msg=input('>>>:').strip()if not msg:continueclient.send(msg.encode('utf-8'))
data=client.recv(1024)print(data.decode('utf-8'))
?
client.close()
View Code
9.12 同步異步阻塞非阻塞
阻塞與非阻塞指的是程序的兩種運行狀態:
阻塞:遇到 I/O 就發生阻塞,程序一旦遇到阻塞操作就會停在原地,并且立刻釋放CPU資源
非阻塞(就緒態或運行態):沒有遇到 I/O 操作,或者通過某種手段讓程序即便是遇到 I/O 操作也不會停在原地,執行其他操作,力求盡可能多的占有CPU
同步與異步指的是提交任務的兩種方式:
同步調用:提交完任務后,就在原地等待,直到任務運行完畢后,拿到任務的返回值,才繼續執行下一行代碼
異步調用:提交完任務后,不在原地等待,直接執行下一行代碼
from concurrent.futures importProcessPoolExecutor,ThreadPoolExecutorimporttime,os,random#from multiprocessing import Pool
deftask(x):print('%s 接客' %x)
time.sleep(random.randint(1,3))return x**2?if __name__ == '__main__':#異步調用
p=ThreadPoolExecutor(4) #默認開啟的線程數是cpu的核數*5
obj_l=[]for i in range(10):
obj=p.submit(task,i)
obj_l.append(obj)
?#p.close()
#p.join()
p.shutdown(wait=True)#shutdown指的是不能再往進程池內提交任務,wait=True指等待進程池或線程池內所有的任務都運行完畢
print(obj_l[3].result()) #9 #最后拿結果
print('主')
?#同步調用
p=ThreadPoolExecutor(4) #默認開啟的線程數是cpu的核數*5
for i in range(10):print(p.submit(task,i).result())print('主')
9.121 異步調用+回調機制
問題:
1、任務的返回值不能得到及時的處理,必須等到所有任務都運行完畢才能統一進行處理
2、解析的過程是串行執行的,如果解析一次需要花費2s,解析9次則需要花費18s
基于進程池:
from concurrent.futures importProcessPoolExecutor,ThreadPoolExecutorimportrequestsimportosimporttimeimportrandom
?defget(url):print('%s GET %s' %(os.getpid(),url))
response=requests.get(url)
time.sleep(random.randint(1,3))if response.status_code == 200:returnresponse.text
?def pasrse(obj): #干解析的活
res=obj.result() #回調拿結果
print('%s 解析結果為:%s' %(os.getpid(),len(res))) #4108 解析結果為:2443
?if __name__ == '__main__':
urls=['https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.python.org',
]
?
pool=ProcessPoolExecutor(4)for url inurls:
obj=pool.submit(get,url) #parse函數會在obj對應的任務執行完畢后自動執行,會把obj自動傳給parse
obj.add_done_callback(pasrse) #四個進程并發爬取信息,主進程在執行解析操作
?print('主進程',os.getpid()) #主進程 4108
View Code
基于線程池:
from concurrent.futures importProcessPoolExecutor,ThreadPoolExecutorfrom threading importcurrent_threadimportrequestsimportosimporttimeimportrandom
?defget(url):print('%s GET %s' %(current_thread().name,url))
response=requests.get(url)
time.sleep(random.randint(1,3))if response.status_code == 200:returnresponse.text
?def pasrse(obj): #干解析的活
res=obj.result()print('%s 解析結果為:%s' %(current_thread().name,len(res)))#ThreadPoolExecutor-0_1 解析結果為:
#2443
if __name__ == '__main__': #ThreadPoolExecutor-0_3 解析結果為:2443
urls=['https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.python.org',
]
?
pool=ThreadPoolExecutor(4)for url inurls:
obj=pool.submit(get,url) #parse函數會在obj對應的任務執行完畢后自動執行,會把obj自動傳給parse
obj.add_done_callback(pasrse) #四個線程并發爬取信息,空閑者執行解析操作
print('主線程',current_thread().name) #主線程 MainThread
View Code
9.13 線程queue
隊列:先進先出 queue.Queue()
importqueue
q=queue.Queue(3)
?
q.put(1)
q.put(2)
q.put(3)#q.put(4) 阻塞
?print(q.get()) #1
print(q.get()) #2
print(q.get()) #3
堆棧:后進先出 queue.LifoQueue()
importqueue
q=queue.LifoQueue(3)
?
q.put('a')
q.put('b')
q.put('c')
?print(q.get()) #c
print(q.get()) #b
print(q.get()) #a
優先級隊列:可以以小元組的形式往隊列里存值,第一個元素代表優先級,數字越小優先級越高
PriorityQueue()
importqueue
q=queue.PriorityQueue(3)
q.put((10,'user1'))
q.put((-3,'user2'))
q.put((-2,'user3'))
?print(q.get()) #(-3, 'user2')
print(q.get()) #(-2, 'user3')
print(q.get()) #(10, 'user1')
總結
以上是生活随笔為你收集整理的python 进程池阻塞和非阻塞_python 之 并发编程(进程池与线程池、同步异步阻塞非阻塞、线程queue)...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 蓝桥杯青少创意编程python组
- 下一篇: anaconda和python的区别_a