日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python 进程池阻塞和非阻塞_python 之 并发编程(进程池与线程池、同步异步阻塞非阻塞、线程queue)...

發布時間:2025/3/15 python 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python 进程池阻塞和非阻塞_python 之 并发编程(进程池与线程池、同步异步阻塞非阻塞、线程queue)... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

9.11 進程池與線程池

池子使用來限制并發的任務數目,限制我們的計算機在一個自己可承受的范圍內去并發地執行任務

池子內什么時候裝進程:并發的任務屬于計算密集型池子內什么時候裝線程:并發的任務屬于IO密集型

進程池:

from concurrent.futures importProcessPoolExecutor,ThreadPoolExecutorimporttime,os,random

?deftask(x):print('%s 接客' %os.getpid())

time.sleep(random.randint(2,5))return x**2?if __name__ == '__main__': #ProcessPoolExecutor創建并開啟指定數目的進程

p=ProcessPoolExecutor() #默認開啟的進程數是cpu的核數

?for i in range(20):

p.submit(task,i)#一下并行執行四個任務,等其中一個任務執行完后再執行下一個

線程池:

from concurrent.futures importProcessPoolExecutor,ThreadPoolExecutorimporttime,os,random

?deftask(x):print('%s 接客' %x)

time.sleep(random.randint(2,5))return x**2?if __name__ == '__main__': #ThreadPoolExecutor創建并開啟指定數目的線程

p=ThreadPoolExecutor(4) #默認開啟的線程數是cpu的核數*5

?for i in range(20):

p.submit(task,i)#一下并發執行四個任務,等其中一個任務執行完后再并發執行下一個

9.112 基于多線程實現并發的套接字通信(使用線程池)

服務端:

from socket import *

from threading importThreadfrom concurrent.futures importProcessPoolExecutor,ThreadPoolExecutor

?

tpool=ThreadPoolExecutor(3) #ThreadPoolExecutor創建并開啟指定數目的線程

defcommunicate(conn,client_addr):while True: #通訊循環

try:

data= conn.recv(1024)if not data: breakconn.send(data.upper())exceptConnectionResetError:breakconn.close()

?defserver():

server=socket(AF_INET,SOCK_STREAM)

server.bind(('127.0.0.1',8080))

server.listen(5)

?while True: #鏈接循環

conn,client_addr=server.accept()print(client_addr)#t=Thread(target=communicate,args=(conn,client_addr))

#t.start()

tpool.submit(communicate,conn,client_addr)#一下并發執行3個任務,等其中一個任務執行完后再并發執行下一個

server.close()

?if __name__ == '__main__':

server()

View Code

客戶端:

from socket import *client=socket(AF_INET,SOCK_STREAM)

client.connect(('127.0.0.1',8080))

?whileTrue:

msg=input('>>>:').strip()if not msg:continueclient.send(msg.encode('utf-8'))

data=client.recv(1024)print(data.decode('utf-8'))

?

client.close()

View Code

9.12 同步異步阻塞非阻塞

阻塞與非阻塞指的是程序的兩種運行狀態:

阻塞:遇到 I/O 就發生阻塞,程序一旦遇到阻塞操作就會停在原地,并且立刻釋放CPU資源

非阻塞(就緒態或運行態):沒有遇到 I/O 操作,或者通過某種手段讓程序即便是遇到 I/O 操作也不會停在原地,執行其他操作,力求盡可能多的占有CPU

同步與異步指的是提交任務的兩種方式:

同步調用:提交完任務后,就在原地等待,直到任務運行完畢后,拿到任務的返回值,才繼續執行下一行代碼

異步調用:提交完任務后,不在原地等待,直接執行下一行代碼

from concurrent.futures importProcessPoolExecutor,ThreadPoolExecutorimporttime,os,random#from multiprocessing import Pool

deftask(x):print('%s 接客' %x)

time.sleep(random.randint(1,3))return x**2?if __name__ == '__main__':#異步調用

p=ThreadPoolExecutor(4) #默認開啟的線程數是cpu的核數*5

obj_l=[]for i in range(10):

obj=p.submit(task,i)

obj_l.append(obj)

?#p.close()

#p.join()

p.shutdown(wait=True)#shutdown指的是不能再往進程池內提交任務,wait=True指等待進程池或線程池內所有的任務都運行完畢

print(obj_l[3].result()) #9 #最后拿結果

print('主')

?#同步調用

p=ThreadPoolExecutor(4) #默認開啟的線程數是cpu的核數*5

for i in range(10):print(p.submit(task,i).result())print('主')

9.121 異步調用+回調機制

問題:

1、任務的返回值不能得到及時的處理,必須等到所有任務都運行完畢才能統一進行處理

2、解析的過程是串行執行的,如果解析一次需要花費2s,解析9次則需要花費18s

基于進程池:

from concurrent.futures importProcessPoolExecutor,ThreadPoolExecutorimportrequestsimportosimporttimeimportrandom

?defget(url):print('%s GET %s' %(os.getpid(),url))

response=requests.get(url)

time.sleep(random.randint(1,3))if response.status_code == 200:returnresponse.text

?def pasrse(obj): #干解析的活

res=obj.result() #回調拿結果

print('%s 解析結果為:%s' %(os.getpid(),len(res))) #4108 解析結果為:2443

?if __name__ == '__main__':

urls=['https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.python.org',

]

?

pool=ProcessPoolExecutor(4)for url inurls:

obj=pool.submit(get,url) #parse函數會在obj對應的任務執行完畢后自動執行,會把obj自動傳給parse

obj.add_done_callback(pasrse) #四個進程并發爬取信息,主進程在執行解析操作

?print('主進程',os.getpid()) #主進程 4108

View Code

基于線程池:

from concurrent.futures importProcessPoolExecutor,ThreadPoolExecutorfrom threading importcurrent_threadimportrequestsimportosimporttimeimportrandom

?defget(url):print('%s GET %s' %(current_thread().name,url))

response=requests.get(url)

time.sleep(random.randint(1,3))if response.status_code == 200:returnresponse.text

?def pasrse(obj): #干解析的活

res=obj.result()print('%s 解析結果為:%s' %(current_thread().name,len(res)))#ThreadPoolExecutor-0_1 解析結果為:

#2443

if __name__ == '__main__': #ThreadPoolExecutor-0_3 解析結果為:2443

urls=['https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.baidu.com','https://www.python.org',

]

?

pool=ThreadPoolExecutor(4)for url inurls:

obj=pool.submit(get,url) #parse函數會在obj對應的任務執行完畢后自動執行,會把obj自動傳給parse

obj.add_done_callback(pasrse) #四個線程并發爬取信息,空閑者執行解析操作

print('主線程',current_thread().name) #主線程 MainThread

View Code

9.13 線程queue

隊列:先進先出 queue.Queue()

importqueue

q=queue.Queue(3)

?

q.put(1)

q.put(2)

q.put(3)#q.put(4) 阻塞

?print(q.get()) #1

print(q.get()) #2

print(q.get()) #3

堆棧:后進先出 queue.LifoQueue()

importqueue

q=queue.LifoQueue(3)

?

q.put('a')

q.put('b')

q.put('c')

?print(q.get()) #c

print(q.get()) #b

print(q.get()) #a

優先級隊列:可以以小元組的形式往隊列里存值,第一個元素代表優先級,數字越小優先級越高

PriorityQueue()

importqueue

q=queue.PriorityQueue(3)

q.put((10,'user1'))

q.put((-3,'user2'))

q.put((-2,'user3'))

?print(q.get()) #(-3, 'user2')

print(q.get()) #(-2, 'user3')

print(q.get()) #(10, 'user1')

總結

以上是生活随笔為你收集整理的python 进程池阻塞和非阻塞_python 之 并发编程(进程池与线程池、同步异步阻塞非阻塞、线程queue)...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。