python 进程
進(jìn)程 Process
- 正在執(zhí)行中的程序稱為進(jìn)程。進(jìn)程的執(zhí)行會占用內(nèi)存等資源。多個進(jìn)程同時執(zhí)行時,每個進(jìn)程的執(zhí)行都需要由操作系統(tǒng)按一定的算法(RR調(diào)度、優(yōu)先數(shù)調(diào)度算法等)分配內(nèi)存空間
創(chuàng)建一個進(jìn)程第一種方式
# process模塊是一個創(chuàng)建進(jìn)程的模塊,借助這個模塊,就可以完成進(jìn)程的創(chuàng)建。 from multiprocessing import Processdef func(n):# 子進(jìn)程函數(shù)# 獲取當(dāng)前進(jìn)程idprint("當(dāng)前進(jìn)程id:", os.getpid())# 獲取父進(jìn)程idprint("父進(jìn)程id:", os.getppid())for i in range(10):time.sleep(2)print("子進(jìn)程", n)# 子進(jìn)程中的程序相當(dāng)于import的主進(jìn)程中的程序,那么import的時候會不會執(zhí)行你import的那個文件的程序啊,前面學(xué)的,是會執(zhí)行的,所以出現(xiàn)了兩次打印 print("-----------------")# Windows下寫代碼開啟子進(jìn)程時,必須寫上if __name__ == ‘__main__’: if __name__ == "__main__":# 首先我運(yùn)行當(dāng)前這個test.py文件,運(yùn)行這個文件的程序,那么就產(chǎn)生了進(jìn)程,這個進(jìn)程我們稱為主進(jìn)程# 將函數(shù)注冊到一個進(jìn)程中,此時還沒有啟動進(jìn)程,只是創(chuàng)建了一個進(jìn)程對象。并且func是不加括號的。p = Process(target=func, args=("傳參",)) # args指定的為傳給target函數(shù)的位置參數(shù),是一個元組形式,必須有逗號# 告訴操作系統(tǒng),給我開啟一個進(jìn)程,func這個函數(shù)就被我們新開的這個進(jìn)程執(zhí)行了,# 而這個進(jìn)程是我主進(jìn)程運(yùn)行過程中創(chuàng)建出來的,所以稱這個新創(chuàng)建的進(jìn)程為主進(jìn)程的子進(jìn)程,而主進(jìn)程又可以稱為這個新進(jìn)程的父進(jìn)程。p.start() # start并不是直接就去執(zhí)行了,我們知道進(jìn)程有三個狀態(tài),進(jìn)程會進(jìn)入進(jìn)程的三個狀態(tài),就緒,(被調(diào)度,也就是時間片切換到它的時候)執(zhí)行,阻塞,并且在這個三個狀態(tài)之間不斷的轉(zhuǎn)換,等待cpu執(zhí)行時間片到了。p.json() # 等待子進(jìn)程執(zhí)行結(jié)束 主進(jìn)程 才能繼續(xù)往下執(zhí)行# 獲取當(dāng)前進(jìn)程idprint("當(dāng)前進(jìn)程id:", os.getpid())# 獲取父進(jìn)程id Pycharm 進(jìn)程的IDprint("父進(jìn)程id Pycharm 進(jìn)程的ID:", os.getppid())# 這是主進(jìn)程的程序,上面開啟的子進(jìn)程的程序是和主進(jìn)程的程序同時運(yùn)行的,我們稱為異步for i in range(10):time.sleep(1)print("父進(jìn)程")process 類中的 參數(shù)
group # 未使用 值始終為 None target # 表示調(diào)用對象,即子進(jìn)程要執(zhí)行的任務(wù) args # 表示調(diào)用對象的 參數(shù)元祖 args=(..., ) kwargs # 表示調(diào)用對象的字典 kwargs={"name":'張',} name # 為子進(jìn)程的名字 定義子進(jìn)程的名字進(jìn)程對象的 方法
from multiprocessing import Process p = Process(target=func,)p.start() # 啟動進(jìn)程 p.json() # 等待子進(jìn)程執(zhí)行結(jié)束 主進(jìn)程 才能繼續(xù)往下執(zhí)行 p.Terminate() # 關(guān)閉進(jìn)程 不是自己關(guān)閉 而是 給操作系統(tǒng) 發(fā)送了一個關(guān)閉進(jìn)程的信號 p.is_alive() # 查看進(jìn)程是否還活著 p.daemon = True # 設(shè)置進(jìn)程為守護(hù)進(jìn)程 寫在 start()之前 子進(jìn)程會跟父進(jìn)程一起結(jié)束 p.name # 進(jìn)程的名字 p.pid # 進(jìn)程的 id創(chuàng)建進(jìn)行的第二種方式:
- 自己定義一個類,繼承Process類,必須寫一個run方法,想傳參數(shù),自行寫init方法,然后執(zhí)行super父類的init方法
驗(yàn)證進(jìn)程間的空間隔離
import time from multiprocessing import Process# 進(jìn)程之間是空間隔離的,不共享資源 global_num = 100def func1():global global_numglobal_num = 0print('子進(jìn)程全局變量>>>', global_num)if __name__ == '__main__':p1 = Process(target=func1, )p1.start()time.sleep(1)print('主進(jìn)程的全局變量>>>', global_num)僵和孤兒進(jìn)程
- 進(jìn)程結(jié)束后資源回收 主進(jìn)程不會管子進(jìn)程 自己結(jié)束
- 使用 p.json() 主進(jìn)程會等待子進(jìn)程結(jié)束后 才結(jié)束
同步鎖 Lock
同步效率低,但是保證了數(shù)據(jù)安全 重點(diǎn)
搶票案例
import random import json import time from multiprocessing import Process, Lockdef quang(i, lock):print("等待搶票")time.sleep(1)lock.acquire() # 鎖頭 只有一把 with open("aaa", "r") as f:dic = json.load(f)if dic["piao"] > 0:time.sleep(random.random())dic["piao"] -= 1with open("aaa", "w") as f1:json.dump(dic, f1)print("%s強(qiáng)盜了" % i)else:print("%s沒票了" % i)lock.release() # 還鎖 if __name__ == "__main__":lo = Lock()for i in range(10):p = Process(target=quang, args=(i, lo))p.start()
信號量 Semaphore
阿斯蒂芬
import random import time from multiprocessing import Process, Semaphoredef dbj(i, s):# 信息 入口s.acquire() print("%s號顧客來洗腳了" % i)time.sleep(random.randrange(2, 7))# 信息 出口s.release()if __name__ == "__main__":# 信息# 創(chuàng)建一個計數(shù)器,每次acquire就減1,直到減到0,那么上面的任務(wù)只有4個在同時異步的執(zhí)行,后面的進(jìn)程需要等待.s = Semaphore(5)for i in range(20):p = Process(target=dbj, args=(i, s))p.start()
事件 Event
通過事件來完成紅路燈
import random import time from multiprocessing import Process, Eventdef hld(e):while 1:print("紅燈了!!")time.sleep(3)# 將事件狀態(tài)改為 Truee.set()print("綠燈了")time.sleep(5)# 將事件狀態(tài)改為 Falsee.clear()def car(i, e):# e.is_set() 查看事件狀態(tài) True Falseif e.is_set():print("%s號車直接通過" % i)else:print("%s號車等紅燈" % i)# 等待 如果 狀態(tài)為 True 時 向后執(zhí)行e.wait()print("%s號車綠燈通過" % i)if __name__ == "__main__":e = Event()p1 = Process(target=hld, args=(e,))p1.start()for i in range(1000):time.sleep(random.random())p = Process(target=car, args=(i, e))p.start() # 方法 e.is_set() # 查看事件狀態(tài)(通過改變事件狀態(tài)來控制事件其他進(jìn)程的運(yùn)行) e.set() # 將事件 改為 True e.clear() # 將事件改為 False e.wait() # 等待 如果 狀態(tài)為 True 時 向后執(zhí)行
進(jìn)程間通信IPC
水電費(fèi)
import time from multiprocessing import Process,Queuedef girl(q):print('來自boy的信息',q.get())print('來自校領(lǐng)導(dǎo)的凝視',q.get()) def boy(q):q.put('約嗎')if __name__ == '__main__':q = Queue(5)boy_p = Process(target=boy,args=(q,))girl_p = Process(target=girl,args=(q,))boy_p.start()girl_p.start()time.sleep(1)q.put('好好工作,別亂搞')
隊列 Queue #重點(diǎn)
先進(jìn)先出
import random import time from multiprocessing import Process, Event, Queueq = Queue(3)q.put(1) # 將對象放入隊列中 會有細(xì)微的 延遲 q.put(2) print("隊列是否已經(jīng)滿了", q.full()) # 隊列是否已經(jīng)滿了 q.put(3) print("隊列是否已經(jīng)滿了", q.full()) # q.put(3)print(q.get()) # 取數(shù)據(jù) print("隊列是否空了", q.empty()) # 隊列是否空了 print(q.get()) print(q.get()) print("隊列是否空了", q.empty()) print(q.get()) print(q.get(False)) # 判斷隊列是否已經(jīng)空了 空了就報錯 queue.Empty q.qsize() # 獲取隊列當(dāng)前大小 就是已存在的數(shù)據(jù)個數(shù) 不可靠while 1:try:print(q.get(False))except:print("11111")breakdef boy(q):q.put("約嗎")def girl(q):while 1:try:print(q.get(False)) # == q.get_nowait() 如果隊列空則報錯except:passif __name__ == "__main__":q = Queue(5)boy = Process(target=boy, args=(q,))girl = Process(target=girl, args=(q,))boy.start()girl.start()time.sleep(1)q.put("好好學(xué)習(xí)")
生產(chǎn)者消費(fèi)者模型
解耦 緩沖 降低生產(chǎn)者與消費(fèi)者之間的 耦合性
import time from multiprocessing import Process, Queuedef producer(p):for i in range(1, 11):time.sleep(1)print("生產(chǎn)了包子%s" % i)# 將生產(chǎn)的包子添加到隊列中p.put("包子%s" % i)# 生產(chǎn)結(jié)束后在隊列末尾 添加一個空信號p.put(None)def consumer(p,i):while 1:time.sleep(1.5)# 循環(huán) 取出所有元素pp = p.get()if pp:print("%s吃了" % i, pp)else:print("%s吃完了" % i)# 將空信息還回去p.put(None)breakif __name__ == "__main__":q = Queue(10)# 創(chuàng)建pro_p = Process(target=producer, args=(q,))pro_p.start()for i in range(2):con_p = Process(target=consumer, args=(q,i))con_p.start()# p2.join()# p.put(None)
JoinableQueue
JoinableQueue的生產(chǎn)者消費(fèi)者模型
- import time from multiprocessing import Process, JoinableQueue# 生產(chǎn)者 def producer(q):for i in range(10):time.sleep(0.5)# 創(chuàng)建10 個包子裝進(jìn)隊列中q.put("包子%s號" % i)print("生產(chǎn)了 包子%s" % i)# 等待隊列中所有內(nèi)容被取走后 關(guān)閉本進(jìn)程q.join()print("包子賣完了")# 消費(fèi)者 def consumer(q):while 1:time.sleep(1)# 循環(huán)吃包子print("吃了 %s" % q.get())# 每取 一個元素 就給 q.join 傳遞一個信號 記錄取出個數(shù)q.task_done() if __name__ == "__main__":# 實(shí)現(xiàn) JoinableQueue 隊列 對象q = JoinableQueue(20)# 將 生產(chǎn)者加入進(jìn)程pro_p = Process(target=producer, args=(q,))pro_p.start()# 將 消費(fèi)者 加入進(jìn)程con_p = Process(target=consumer, args=(q,))# 將消費(fèi)者設(shè)置成守護(hù)進(jìn)程 隨 住程序一起關(guān)閉con_p.daemon = Truecon_p.start()# 等待進(jìn)程 執(zhí)行完閉 主程序才能關(guān)閉pro_p.join()print("關(guān)閉程序!")
管道 pipe
管道是不安全的
- from multiprocessing import Process, Pipe, Manager, Lock, Pooldef func(conn2):try:print(conn2.recv())print(conn2.recv())except EOFError:print("管道已經(jīng)關(guān)閉了")conn2.close()if __name__ == '__main__':try:conn1, conn2 = Pipe() # 在創(chuàng)建 Process 對象之前創(chuàng)建管道p = Process(target=func, args=(conn2,))p.start()conn1.send("asdad")conn1.close()conn1.recv()p.join()except OSError:print("管道關(guān)閉>>>>>>>>>>")# 方法 recv() 接收 send() 發(fā)送 #- 管道默認(rèn)是雙工的 # 設(shè)置為 單工 參數(shù): duplex=False 改為單工 conn1 發(fā)送 conn2 接收 如果另一端已經(jīng)關(guān)閉 則 recv() 接收會報錯
數(shù)據(jù)共享 Manager
多進(jìn)程同時操作一個文件的數(shù)據(jù) 不加鎖就會出現(xiàn)錯誤數(shù)據(jù)
共享: 可以將一個數(shù)據(jù)傳遞到進(jìn)程中 在不同的 作用于中 進(jìn)程可對其進(jìn)行修改
import time, os from multiprocessing import Process, Manager, Lock'''資源共享'''def func(mm):mm["name"] = "張飛"if __name__ == '__main__':m = Manager()mm = m.dict({"name": "aaaa"})print(mm)p = Process(target=func, args=(mm,))p.start()p.join()print(mm)def func(m_d, ml):with ml:m_d["count"] -= 1if __name__ == '__main__':m = Manager()ml = Lock()m_d = m.dict({"count": 100})lis = []for i in range(20):p = Process(target=func, args=(m_d, ml))p.start()lis.append(p)[i.join() for i in lis]print(m_d)
進(jìn)程池 Pool
- import time from multiprocessing import Process,Pooldef func(n):for i in range(5):n = n + iprint(n)if __name__ == '__main__':#驗(yàn)證一下傳參pool = Pool(4)pool.map(func,range(100)) #map自帶join功能,異步執(zhí)行任務(wù),參數(shù)是可迭代的p_list = []for i in range(200):p1 = Process(target=func,args=(i,))p1.start()p_list.append(p1)[p.join() for p in p_list]
-
def func(n):print(n)time.sleep(1)return n * nif __name__ == '__main__':pool = Pool(4) # 進(jìn)程池 的個數(shù)lis = []for i in range(10):# res = p.apply(fun,args=(i,)) #同步執(zhí)行的方法,他會等待你的任務(wù)的返回結(jié)果,# 異步運(yùn)行,根據(jù)進(jìn)程池中有的進(jìn)程數(shù),每次最多3個子進(jìn)程在異步執(zhí)行,并且可以執(zhí)行不同的任務(wù),傳送任意的參數(shù)了。# 返回結(jié)果之后,將結(jié)果放入列表,歸還進(jìn)程,之后再執(zhí)行新的任務(wù)# 需要注意的是,進(jìn)程池中的三個進(jìn)程不會同時開啟或者同時結(jié)束# 而是執(zhí)行完一個就釋放一個進(jìn)程,這個進(jìn)程就去接收新的任務(wù)。ret = pool.apply_async(func, args=(i,)) # 異步執(zhí)行的方法,他會等待你的任務(wù)的返回結(jié)果,# print(ret.get())lis.append(ret)# print(lis)pool.close() # 不是關(guān)閉進(jìn)程池,而是不允許再有其他任務(wù)來使用進(jìn)程池pool.join() # 這是感知進(jìn)程池中任務(wù)的方法,等待進(jìn)程池的任務(wù)全部執(zhí)行完pool.ready() # 如果調(diào)用完成 返回 Truepool.terminate() # 立即終止所有進(jìn)程for i in lis:print(i.get()) #使用get來獲取apply_aync的結(jié)果,如果是apply,則沒有g(shù)et方法,因?yàn)閍pply是同步執(zhí)行,立刻獲取結(jié)果,也根本無需getprint("主程序結(jié)束")
map 傳參
import time from multiprocessing import Process,Pooldef func(n):print(n)if __name__ == '__main__':pool = Pool(4)# pool.map(func,range(100)) #參數(shù)是可迭代的pool.map(func,['sb',(1,2)]) #參數(shù)是可迭代的# pool.map(func,range(100)) #map自帶join功能,異步執(zhí)行任務(wù),參數(shù)是可迭代的回調(diào)函數(shù) callback
回調(diào)函數(shù)的形參只能有一個 如果執(zhí)行函數(shù)有多個返回值 那么 回調(diào)函數(shù) 接收的是一個元祖 包含所有執(zhí)行函數(shù)的返回值
import time, os from multiprocessing import Process, Pooldef func1(n):print(os.getpid())n += 10return ndef func2(nn):# 回調(diào)函數(shù) 接收 func1 的返回值print(os.getpid())print(nn)return 10if __name__ == '__main__':pool = Pool()print(os.getpid())lis = []c = pool.apply_async(func1, args=(1,), callback=func2) # 將func1 反回的結(jié)果交給func2 執(zhí)行pool.close() # 不是關(guān)閉進(jìn)程池,而是不允許再有其他任務(wù)來使用進(jìn)程池pool.join() # 這是感知進(jìn)程池中任務(wù)的方法,等待進(jìn)程池的任務(wù)全部執(zhí)行完
多進(jìn)程爬蟲
from multiprocessing import Process, Pool from urllib.request import urlopen import ssl, re # ?掉數(shù)字簽名證書 ssl._create_default_https_context = ssl._create_unverified_contexturls = ['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/' ] def func1(path):# 打開網(wǎng)址condent = urlopen(path)# 返回網(wǎng)頁源代碼return condent.read().decode("utf-8")def func2(content):com = re.compile(r"<a(?P<aaa>.*?)</a>")cont = re.findall(com, content)print(cont)if __name__ == '__main__':pool = Pool()for path in urls:tar = pool.apply_async(func1, args=(path,), callback=func2)pool.close()pool.join()轉(zhuǎn)載于:https://www.cnblogs.com/zhang-zi-yi/p/10755810.html
總結(jié)
- 上一篇: package 与 package-lo
- 下一篇: python写炒股软件_使用Python