Python 进程(process)
生活随笔
收集整理的這篇文章主要介紹了
Python 进程(process)
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
1. 進(jìn)程
1.1 進(jìn)程的創(chuàng)建 fork
正在運(yùn)行著的代碼,就稱為進(jìn)程
# 示例:
import os
# 注意: fork 函數(shù),只在 Unix/Linux/Mac 上運(yùn)行, windows 不可以
pid = os.fork()
print(pid)
if pid == 0:
# 子進(jìn)程執(zhí)行(pid==0)
print("=== 子進(jìn)程 === %d %d" % (os.getpid(),os.getppid()))
else:
# 主(父)進(jìn)程執(zhí)行(pid>0)
print("=== 父進(jìn)程 ===%d" % os.getpid())
1.2 全局變量在進(jìn)程中不共享
# 示例:
import os
import time
g_num = 100
ret = os.fork()
if ret == 0:
print("=== process 1 ===")
g_num += 1
print("=== process 1 g_num = %d===" % g_num)
else:
time.sleep(3)
print("=== process 2 ===")
print("=== process 2 g_num = %d ===" % g_num)
# 輸出:
=== process 1 ===
=== process 1 g_num = 101 ===
=== process 2 ===
=== process 2 g_num = 100 ===
1.3 多次fork
# 示例:
import os
ret = os.fork()
if ret == 0:
print("=== 1 ===")
else:
print("=== 2 ===")
ret = os.fork()
if ret == 0:
print("=== 3 ===")
else:
print("=== 4 ===")
# 輸出:
=== 2 ===
=== 4 ===
=== 1 ===
=== 3 ===
=== 4 ===
=== 3 ===
1.4 Process 創(chuàng)建子進(jìn)程
# 示例
from multiprocessing import Process
import os
# 子進(jìn)程要執(zhí)行的代碼
def run_proc(name):
print('子進(jìn)程運(yùn)行中, name=%s, pid=%d...' % (name, os.getpid()))
if __name__=='__main__':
print('父進(jìn)程 %d.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('子進(jìn)程將要執(zhí)行')
p.start() # 創(chuàng)建子進(jìn)程
p.join() # 等到子進(jìn)程實(shí)例(p)執(zhí)行結(jié)束后,才執(zhí)行后面的代碼
print('子進(jìn)程已結(jié)束')
1.5 Process 的子類
# 示例:
from multiprocessing import Process
import time
import os
# 繼承Process類
class Process_Class(Process):
# 因?yàn)镻rocess類本身也有 __init__ 方法,這個(gè)類相當(dāng)于重寫(xiě)了這個(gè)方法;
# 但這樣就會(huì)帶來(lái)一個(gè)問(wèn)題,我們并沒(méi)有完全的初始化一個(gè) Process 類, 所以就不能使用從這個(gè)類繼承的一些方法
# 最好的方法就是將繼承類本身傳遞給 Process.__init__ 方法, 完成這些初始化操作
def __init__(self, interval):
# super(Process_Class, self).__init__()
Process.__init__(self)
self.interval = interval
# 重寫(xiě)Process類的run()方法
def run(self):
print("子進(jìn)程(%s) 開(kāi)始執(zhí)行, 父進(jìn)程為 (%s)" % (os.getpid(), os.getppid()))
t_start = time.time()
time.sleep(self.interval)
t_stop = time.time()
print("(%s)執(zhí)行結(jié)束, 耗時(shí)%0.2f秒" % (os.getpid(), t_stop-t_start))
if __name__=="__main__":
t_start = time.time()
print("當(dāng)前進(jìn)程(%s)" % os.getpid())
p1 = Process_Class(2)
# 對(duì)一個(gè)不包含target屬性的Process類執(zhí)行start()方法,就會(huì)運(yùn)行這個(gè)類中的 run() 方法
p1.start()
p1.join()
t_stop = time.time()
print("(%s)執(zhí)行結(jié)束, 耗時(shí)%0.2f" % (os.getpid(), t_stop-t_start))
1.6 進(jìn)程池 Pool(非阻塞方式)
# 示例:
from multiprocessing import Pool
import os, time, random
def worker(msg):
t_start = time.time()
print("%s開(kāi)始執(zhí)行, 進(jìn)程號(hào)為%d" % (msg, os.getpid()))
# random.random()隨機(jī)生成0~1之間的浮點(diǎn)數(shù)
time.sleep(random.random()*2)
t_stop = time.time()
print(msg, "執(zhí)行完畢, 耗時(shí)%0.2f" % (t_stop-t_start))
po = Pool(3) # 定義一個(gè)進(jìn)程池, 最大進(jìn)程數(shù) 3
for i in range(0, 10):
# Pool.apply_async(要調(diào)用的目標(biāo),(傳遞給目標(biāo)的參數(shù)元組,))
# 每次循環(huán)將會(huì)用空閑出來(lái)的子進(jìn)程去調(diào)用目標(biāo)
po.apply_async(worker,(i,))
print("=== start ===")
po.close() # 關(guān)閉進(jìn)程池,關(guān)閉后po不再接收新的請(qǐng)求
po.join() # 等待po中所有子進(jìn)程執(zhí)行完成, 必須放在close語(yǔ)句之后
print("=== close ===")
1.7 apply 阻塞方式添加任務(wù)
# 示例:
from multiprocessing import Pool
import os, time, random
def worker(msg):
t_start = time.time()
print("%s開(kāi)始執(zhí)行, 進(jìn)程號(hào)為%d" % (msg, os.getpid()))
# random.random()隨機(jī)生成0~1之間的浮點(diǎn)數(shù)
time.sleep(random.random()*2)
t_stop = time.time()
print(msg, "執(zhí)行完畢, 耗時(shí)%0.2f" % (t_stop-t_start))
po = Pool(3) # 定義一個(gè)進(jìn)程池, 最大進(jìn)程數(shù) 3
for i in range(0, 10):
po.apply(worker, (i,))
print("=== start ===")
po.close()
po.join()
print("=== close ===")
1.8 進(jìn)程間通信(Queue)
# 示例一: Queue 存取數(shù)據(jù)
from multiprocessing import Queue
q=Queue(3) # 初始化一個(gè)Queue對(duì)象,最多可接收三條put消息
q.put("消息1")
q.put("消息2")
print(q.full()) # False
q.put("消息3")
print(q.full()) # True
# 因?yàn)橄㈥?duì)列已滿,下面的try會(huì)拋出異常,第一個(gè)try會(huì)等待2秒后再拋出異常,第二個(gè)try會(huì)立刻拋出異常
try:
q.put("消息4",True,2)
except:
print("消息隊(duì)列已滿,現(xiàn)有消息數(shù)量: %s" % q.qsize())
try:
q.put_nowait("消息4")
except:
print("消息隊(duì)列已滿,現(xiàn)有消息數(shù)量: %s" % q.qsize())
# 推薦的方式: 先判斷消息隊(duì)列是否已滿, 再寫(xiě)入
if not q.full():
q.put_nowait("消息4")
# 讀取消息時(shí), 先判斷消息隊(duì)列是否為空,再讀取
if not q.empty():
for i in range(q.qsize()):
print(q.get_nowait())
# 示例二: 進(jìn)程間通信
# 在父進(jìn)程中創(chuàng)建兩個(gè)子進(jìn)程,一個(gè)往Queue里寫(xiě)數(shù)據(jù), 一個(gè)從Queue里讀數(shù)據(jù)
from multiprocessing import Process, Queue
import os, time, random
# 寫(xiě)數(shù)據(jù)進(jìn)程執(zhí)行代碼
def write(q):
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 讀數(shù)據(jù)進(jìn)程執(zhí)行代碼
def read(q):
while True:
if not q.empty():
value = q.get(True)
print('Get %s from queue.' % value)
time.sleep(random.random())
else:
break
if __name__=='__main__':
# 父進(jìn)程創(chuàng)建Queue,并傳給各子進(jìn)程
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動(dòng)子進(jìn)程pw, 寫(xiě)入:
pw.start()
# 等待pw結(jié)束
pw.join()
# 啟動(dòng)子進(jìn)程pr, 讀取:
pr.start()
pr.join()
# 所有子進(jìn)程執(zhí)行結(jié)束,輸出
print('所有數(shù)據(jù)都寫(xiě)入并讀完')
1.8.1 進(jìn)程間通信(Pipe)
# 示例:
from multiprocessing import Process, Pipe
def f(conn):
conn.send([33, None, 'hello'])
conn.close()
if __name__ = '__main__':
parent_conn, child_conn = Pipe()
p = Process(target = f, args = (child_conn, ))
p.start()
print(parent_conn.recv())
p.join()
1.9 進(jìn)程池中的Queue
在進(jìn)程池中,需要使用 multiprocessing.Manager()中的 Queue();而不是 multiprocessing.Queue()
# 示例:
from multiprocessing import Manager, Pool
import os, time, random
# 寫(xiě)數(shù)據(jù)進(jìn)程執(zhí)行代碼
def reader(q):
print("reader啟動(dòng)(%s), 父進(jìn)程為(%s)" % (os.getpid(), os.getppid()))
for i in range(q.qsize()):
print("reader從Queue獲取到消息: %s" %q.get(True))
# 讀數(shù)據(jù)進(jìn)程執(zhí)行代碼:
def writer(q):
print("writer啟動(dòng)(%s), 父進(jìn)程為(%s)" % (os.getpid(), os.getppid()))
for i in ['A', 'B', 'C']:
q.put(i)
if __name__=="__main__":
print("(%s) start" % os.getpid())
q=Manager().Queue() # 使用Manager中的Queue來(lái)初始化
po=Pool()
# 使用阻塞模式創(chuàng)建進(jìn)程,這樣就不需要在reader中使用死循環(huán)了, 可以讓writer執(zhí)行完成后,再用reader讀取
po.apply(writer,(q,))
po.apply(reader,(q,))
po.close()
po.join()
print("(%s) End" % os.getpid())
參考資料:
Python 核心編程
總結(jié)
以上是生活随笔為你收集整理的Python 进程(process)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: VisualSVN Server 配置和
- 下一篇: Web安全测试之XSS(跨站脚本攻击)