日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > python >内容正文

python

python并发编程3-进程

發(fā)布時(shí)間:2025/3/15 python 16 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python并发编程3-进程 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

復(fù)習(xí):

# 鎖 # 多個(gè)進(jìn)程在同一時(shí)間只有一個(gè)進(jìn)程能進(jìn)入代碼去執(zhí)行# 信號(hào)量 Semaphore from multiprocessing import Semaphore # 用鎖的原理實(shí)現(xiàn)的。內(nèi)置了一個(gè)計(jì)數(shù)器 #在同一時(shí)間 只能有指定數(shù)量的進(jìn)程執(zhí)行某一段被控制住的代碼#事件 # wait阻塞收到事件狀態(tài)控制的同步組件 # 狀態(tài) True False 利用is_set查看#true-->false clear()#false-->true set() #wait 狀態(tài)為T(mén)rue不阻塞,狀態(tài)為False的時(shí)候阻塞# 鎖 多個(gè)進(jìn)程在同一時(shí)間只有一個(gè)進(jìn)程能進(jìn)入代碼去執(zhí)行 # 信號(hào)量 在同一時(shí)間 只能有指定數(shù)量的進(jìn)程執(zhí)行某一段被控制住的代碼 # 事件 進(jìn)程要執(zhí)行必須等通知 # 進(jìn)程池 內(nèi)有固定數(shù)目的進(jìn)程。輪流利用進(jìn)程#隊(duì)列 # Queue # put() 當(dāng)隊(duì)列滿(mǎn)的時(shí)候阻塞等待隊(duì)列有空位置 # get() 當(dāng)隊(duì)列空的時(shí)候阻塞等待隊(duì)列有數(shù)據(jù) # full() empty() 不太準(zhǔn)確 # JoinableQueue# get() task_done# put() join 等待數(shù)據(jù)取完并處理完

?管道

案例1:

def func(conn):conn.send('hello') if __name__ =='__main__':conn1, conn2 = Pipe()Process(target=func,args=(conn1,)).start()print(conn2.recv())#運(yùn)行結(jié)果: # hello

案例2:

def func(conn1,conn2):conn2.close() # 將不用的鏈接關(guān)閉while True:try:msg=conn1.recv()# if msg is None:breakprint(msg)except EOFError:conn1.close()break if __name__ =='__main__':conn1, conn2 = Pipe()Process(target=func,args=(conn1,conn2)).start() #參數(shù)必須將conn1,conn2都傳過(guò)去conn1.close()for i in range(5):conn2.send('hello')# conn2.send(None)conn2.close()

運(yùn)行結(jié)果:

管道實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型:
?

from multiprocessing import Process,Pipe,Lock import time import random def producer(con,pro,name,food):con.close()for i in range(5):time.sleep(random.random())f='%s生產(chǎn)%s第%i個(gè)'%(name,food,i)print(f)pro.send(f)pro.close() def consumer(con,pro,name):pro.close()while True:try:food=con.recv()print('%s吃了%s'%(name,food))time.sleep(random.random())except EOFError:con.close()breakif __name__ =='__main__':con,pro=Pipe()p=Process(target=producer,args=(con,pro,'egon','餃子'))p.start()c=Process(target=consumer,args=(con,pro,'alex'))c.start()c1 = Process(target=consumer, args=(con, pro, 'Tony'))c1.start()con.close()pro.close()# Pipe 數(shù)據(jù)不安全性 多個(gè)消費(fèi)者同時(shí)取一個(gè)數(shù)據(jù) # 管道是進(jìn)程數(shù)據(jù)不安全的 利用加鎖 # IPC

運(yùn)行結(jié)果:

管道實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型-改進(jìn)

from multiprocessing import Process,Pipe,Lock import time import random def producer(con,pro,n):con.close()for i in range(n):pro.send(i)pro.send(None)pro.send(None)pro.close()def consumer(lock,con,pro,name):pro.close()while True:lock.acquire()food=con.recv()lock.release()if food:print('%s吃了%s'%(name,food))time.sleep(random.random())else:con.close()breakif __name__ =='__main__':con,pro=Pipe()lock=Lock()p=Process(target=producer,args=(con,pro,10))p.start()c=Process(target=consumer,args=(lock,con,pro,'alex'))c.start()c1 = Process(target=consumer, args=(lock,con, pro, 'Tony'))c1.start()con.close()pro.close()# Pipe 數(shù)據(jù)不安全性 多個(gè)消費(fèi)者同時(shí)取一個(gè)數(shù)據(jù) # 管道是進(jìn)程數(shù)據(jù)不安全的 利用加鎖 # IPC # 隊(duì)列 進(jìn)程之間數(shù)據(jù)安全 # 隊(duì)列=管道+鎖

運(yùn)行結(jié)果:

進(jìn)程間數(shù)據(jù)共享

from multiprocessing import Manager,Process,Lock def main(dict,lock):lock.acquire()dict['count']-=1lock.release() if __name__ =='__main__':m=Manager() # object 對(duì)象l=Lock()dict=m.dict({'count':100})p_list=[]for i in range(50):p=Process(target=main,args=(dict,l))p.start()p_list.append(p)for i in p_list:i.join()print('主進(jìn)程:',dict)

運(yùn)行結(jié)果:

進(jìn)程池-map

# 為什么會(huì)有進(jìn)程池的概念#效率# 每開(kāi)啟進(jìn)程,開(kāi)啟屬于這個(gè)進(jìn)程的內(nèi)存空間# 寄存器 堆棧 文件# 進(jìn)程過(guò)多 操作系統(tǒng)的調(diào)度 # 進(jìn)程池# python中的 先創(chuàng)建一個(gè)屬于進(jìn)程的池子# 這個(gè)池子指定能存放多少個(gè)進(jìn)程# 先將這些進(jìn)程創(chuàng)建好 # 更高級(jí)的進(jìn)程池 python沒(méi)有# n,m# n 只起n個(gè)進(jìn)程# 加進(jìn)程# 一直加到上限m個(gè)# 進(jìn)程空閑時(shí),再減少進(jìn)程,一直減到n個(gè)。。。 from multiprocessing import Pool,Process def func(n):for i in range(2):print(n+1) def func2(n):# n[0]# n[1]for i in range(2):print(n[0]) # Process 超過(guò)5個(gè)進(jìn)程,就用進(jìn)程池。一般個(gè)數(shù)是cpu+1 import time if __name__ =='__main__':start=time.time()pool=Pool(5) # 5個(gè)進(jìn)程pool.map(func,range(6)) #100個(gè)任務(wù),自帶join#pool.map(func2,range(100)) # 100個(gè)任務(wù),自帶joinpool.map(func2, [('alex',1),'egon']) # 100個(gè)任務(wù),自帶joint1=time.time()-startstart=time.time()# 等價(jià)于:多進(jìn)程p_list=[]for i in range(6):p=Process(target=func,args=(i,))p_list.append(p)p.start()for p in p_list:p.join()t2 = time.time() - startprint(t1,t2)

運(yùn)行結(jié)果:

進(jìn)程池-apply

from multiprocessing import Pool,Process import time,os def func(n):print('start func%s'%n,os.getpid())time.sleep(1)print('end func%s'%n,os.getpid())if __name__ =='__main__':p=Pool(5)# for i in range(10):# p.apply(func,args=(i,)) # 同步for i in range(10):p.apply_async(func, args=(i,)) # 異步。主進(jìn)程結(jié)束就結(jié)束了。沒(méi)有等子進(jìn)程結(jié)束p.close() # 結(jié)束進(jìn)程池接收任務(wù)p.join() # 感知進(jìn)程池中的任務(wù)執(zhí)行結(jié)束。然而進(jìn)程池中的進(jìn)程永遠(yuǎn)是活著的

運(yùn)行結(jié)果:

進(jìn)程池的返回值

# p=Pool() # p.map(funcname,iterable) 默認(rèn)異步的執(zhí)行任務(wù),且自帶close和join # p.apply() 同步調(diào)用 # p.apply_async 異步調(diào)用,和主進(jìn)程完全異步 需要手動(dòng)close和join'''apply''' # from multiprocessing import Pool # def func(i): # return i*i # # if __name__ =='__main__': # p=Pool(5) # 如果不寫(xiě)5,默認(rèn)為cpu個(gè)數(shù) # for i in range(10): # res=p.apply(func,args=(i,)) # print(res)'''apply_async異步''' # from multiprocessing import Pool # import time # def func(i): # time.sleep(0.5) # return i*i # # if __name__ =='__main__': # p=Pool(5) # 如果不寫(xiě)5,默認(rèn)為cpu個(gè)數(shù) # for i in range(10): # res=p.apply_async(func,args=(i,)) # print(res.get()) # 阻塞等待結(jié)果'''apply_async異步''' # from multiprocessing import Pool # import time # def func(i): # time.sleep(0.5) # return i*i # # if __name__ =='__main__': # p=Pool(5) # 如果不寫(xiě)5,默認(rèn)為cpu個(gè)數(shù) # res_list=[] # for i in range(10): # res=p.apply_async(func,args=(i,)) # res_list.append(res) # # print(res.get()) # 阻塞等待結(jié)果 # print(123) # for res in res_list:print(res.get())#map異步 from multiprocessing import Pool import time def func(i):time.sleep(0.5)return i*iif __name__ =='__main__':p=Pool(5) # 如果不寫(xiě)5,默認(rèn)為cpu個(gè)數(shù)ret=p.map(func,range(10))print(ret)

運(yùn)行結(jié)果:

進(jìn)程池的回調(diào)函數(shù)

# 回調(diào)函數(shù) import os from multiprocessing import Pool def func1(n):print('func1:',os.getpid())print('in func1')return n*ndef func2(nn):print('func2:',os.getpid())print('in func2')print(nn)if __name__ =='__main__':print('主進(jìn)程:', os.getpid())p=Pool(5)p.apply_async(func1,args=(8,),callback=func2)p.close()p.join()

運(yùn)行結(jié)果:

回調(diào)函數(shù)2:

# 回調(diào)函數(shù) 爬蟲(chóng)用到 # 爬蟲(chóng) 訪問(wèn)一個(gè)網(wǎng)址,將數(shù)據(jù)從網(wǎng)址上下載下來(lái)。數(shù)據(jù)是bytes轉(zhuǎn)成字符串。處理字符串 from multiprocessing import Pool def func1(n):return n+1 def func2(m):print(m) if __name__ == '__main__':p=Pool(5)for i in range(10,20):p.apply_async(func1,args=(i,),callback=func2)p.close()p.join()

?運(yùn)行結(jié)果:

回調(diào)函數(shù)爬蟲(chóng)例子

import requests from urllib.request import urlopen from multiprocessing import Pool # response=requests.get('http://www.baidu.com') # print(response) # print(response.__dict__) # print(response.status_code) # print(response.content.decode('utf-8'))def get(url):response=requests.get(url)if response.status_code==200:return url,response.content.decode('utf-8')def get_urllib(url):ret=urlopen(url)return ret.read().decode('utf-8')def call_back(args):url,content=argsprint(url,len(content))if __name__ =='__main__':url_list=['http://www.cnblogs.com','http://www.baidu.com','http://www.sogou.com','http://www.suhu.com',]p=Pool(5)for url in url_list:p.apply_async(get,args=(url,),callback=call_back)p.close()p.join()

運(yùn)行結(jié)果:

使用進(jìn)程池qq聊天:

server端:
?

import socket from multiprocessing import Pool def func(conn):conn.send(b'hello')print(conn.recv(1024).decode('utf-8'))conn.close()if __name__ == '__main__':p=Pool(5)sk=socket.socket()sk.bind(('127.0.0.1',8080))sk.listen()while True:conn, addr = sk.accept()p.apply_async(func,args=(conn,))sk.close()

client端:

import socketsk=socket.socket() sk.connect(('127.0.0.1',8080))ret=sk.recv(1024).decode('utf-8') print(ret) msg=input('>>>').encode('utf-8') sk.send(msg) sk.close()

運(yùn)行結(jié)果:

總結(jié)

以上是生活随笔為你收集整理的python并发编程3-进程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。