日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 综合教程 >内容正文

综合教程

Python 进程(process)

發(fā)布時(shí)間:2024/6/21 综合教程 27 生活家
生活随笔 收集整理的這篇文章主要介紹了 Python 进程(process) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1. 進(jìn)程

1.1 進(jìn)程的創(chuàng)建 fork

正在運(yùn)行著的代碼,就稱為進(jìn)程

# 示例:
import os

# 注意: fork 函數(shù),只在 Unix/Linux/Mac 上運(yùn)行, windows 不可以
pid = os.fork()
print(pid)
if pid == 0:
    # 子進(jìn)程執(zhí)行(pid==0)
    print("=== 子進(jìn)程 === %d %d" % (os.getpid(),os.getppid()))
else:
    # 主(父)進(jìn)程執(zhí)行(pid>0)
    print("=== 父進(jìn)程 ===%d" % os.getpid())

1.2 全局變量在進(jìn)程中不共享

# 示例:
import os
import time

g_num = 100

ret = os.fork()
if ret == 0:
    print("=== process 1 ===")
    g_num += 1
    print("=== process 1  g_num = %d===" % g_num)
else:
    time.sleep(3)
    print("=== process 2 ===")
    print("=== process 2 g_num = %d ===" % g_num)

# 輸出:
=== process 1 ===
=== process 1  g_num = 101 ===
=== process 2 ===
=== process 2  g_num = 100 ===

1.3 多次fork

# 示例:
import os

ret = os.fork()
if ret == 0:
    print("=== 1 ===")
else:
    print("=== 2 ===")

ret = os.fork()
if ret == 0:
    print("=== 3 ===")
else:
    print("=== 4 ===")

# 輸出:
=== 2 ===
=== 4 ===
=== 1 ===
=== 3 ===
=== 4 ===
=== 3 ===

1.4 Process 創(chuàng)建子進(jìn)程

# 示例
from multiprocessing import Process
import os

# 子進(jìn)程要執(zhí)行的代碼
def run_proc(name):
    print('子進(jìn)程運(yùn)行中, name=%s, pid=%d...' % (name, os.getpid()))


if __name__=='__main__':
    print('父進(jìn)程 %d.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('子進(jìn)程將要執(zhí)行')
    p.start()   # 創(chuàng)建子進(jìn)程
    p.join()    # 等到子進(jìn)程實(shí)例(p)執(zhí)行結(jié)束后,才執(zhí)行后面的代碼
    print('子進(jìn)程已結(jié)束')

1.5 Process 的子類

# 示例:
from multiprocessing import Process
import time
import os

# 繼承Process類
class Process_Class(Process):
    # 因?yàn)镻rocess類本身也有 __init__ 方法,這個(gè)類相當(dāng)于重寫(xiě)了這個(gè)方法;
    # 但這樣就會(huì)帶來(lái)一個(gè)問(wèn)題,我們并沒(méi)有完全的初始化一個(gè) Process 類, 所以就不能使用從這個(gè)類繼承的一些方法
    # 最好的方法就是將繼承類本身傳遞給 Process.__init__ 方法, 完成這些初始化操作
    def __init__(self, interval):
        # super(Process_Class, self).__init__()
        Process.__init__(self)
        self.interval = interval

    # 重寫(xiě)Process類的run()方法
    def run(self):
        print("子進(jìn)程(%s) 開(kāi)始執(zhí)行, 父進(jìn)程為 (%s)" % (os.getpid(), os.getppid()))
        t_start = time.time()
        time.sleep(self.interval)
        t_stop = time.time()
        print("(%s)執(zhí)行結(jié)束, 耗時(shí)%0.2f秒" % (os.getpid(), t_stop-t_start))

if __name__=="__main__":
    t_start = time.time()
    print("當(dāng)前進(jìn)程(%s)" % os.getpid())
    p1 = Process_Class(2)
    # 對(duì)一個(gè)不包含target屬性的Process類執(zhí)行start()方法,就會(huì)運(yùn)行這個(gè)類中的 run() 方法
    p1.start()
    p1.join()
    t_stop = time.time()
    print("(%s)執(zhí)行結(jié)束, 耗時(shí)%0.2f" % (os.getpid(), t_stop-t_start))

1.6 進(jìn)程池 Pool(非阻塞方式)

# 示例:
from multiprocessing import Pool
import os, time, random

def worker(msg):
    t_start = time.time()
    print("%s開(kāi)始執(zhí)行, 進(jìn)程號(hào)為%d" % (msg, os.getpid()))
    # random.random()隨機(jī)生成0~1之間的浮點(diǎn)數(shù)
    time.sleep(random.random()*2)
    t_stop = time.time()
    print(msg, "執(zhí)行完畢, 耗時(shí)%0.2f" % (t_stop-t_start))

po = Pool(3)  # 定義一個(gè)進(jìn)程池, 最大進(jìn)程數(shù) 3
for i in range(0, 10):
    # Pool.apply_async(要調(diào)用的目標(biāo),(傳遞給目標(biāo)的參數(shù)元組,))
    # 每次循環(huán)將會(huì)用空閑出來(lái)的子進(jìn)程去調(diào)用目標(biāo)
    po.apply_async(worker,(i,))

print("=== start ===")
po.close()  # 關(guān)閉進(jìn)程池,關(guān)閉后po不再接收新的請(qǐng)求
po.join()   # 等待po中所有子進(jìn)程執(zhí)行完成, 必須放在close語(yǔ)句之后
print("=== close ===")

1.7 apply 阻塞方式添加任務(wù)

# 示例:
from multiprocessing import Pool
import os, time, random

def worker(msg):
    t_start = time.time()
    print("%s開(kāi)始執(zhí)行, 進(jìn)程號(hào)為%d" % (msg, os.getpid()))
    # random.random()隨機(jī)生成0~1之間的浮點(diǎn)數(shù)
    time.sleep(random.random()*2)
    t_stop = time.time()
    print(msg, "執(zhí)行完畢, 耗時(shí)%0.2f" % (t_stop-t_start))

po = Pool(3)  # 定義一個(gè)進(jìn)程池, 最大進(jìn)程數(shù) 3
for i in range(0, 10):
    po.apply(worker, (i,))

print("=== start ===")
po.close()
po.join()
print("=== close ===")

1.8 進(jìn)程間通信(Queue)

# 示例一: Queue 存取數(shù)據(jù)
from multiprocessing import Queue
q=Queue(3)  # 初始化一個(gè)Queue對(duì)象,最多可接收三條put消息
q.put("消息1")
q.put("消息2")
print(q.full())     # False
q.put("消息3")
print(q.full())     # True

# 因?yàn)橄㈥?duì)列已滿,下面的try會(huì)拋出異常,第一個(gè)try會(huì)等待2秒后再拋出異常,第二個(gè)try會(huì)立刻拋出異常
try:
    q.put("消息4",True,2)
except:
    print("消息隊(duì)列已滿,現(xiàn)有消息數(shù)量: %s" % q.qsize())

try:
    q.put_nowait("消息4")
except:
    print("消息隊(duì)列已滿,現(xiàn)有消息數(shù)量: %s" % q.qsize())

# 推薦的方式: 先判斷消息隊(duì)列是否已滿, 再寫(xiě)入
if not q.full():
    q.put_nowait("消息4")

# 讀取消息時(shí), 先判斷消息隊(duì)列是否為空,再讀取
if not q.empty():
    for i in range(q.qsize()):
        print(q.get_nowait())



# 示例二: 進(jìn)程間通信
# 在父進(jìn)程中創(chuàng)建兩個(gè)子進(jìn)程,一個(gè)往Queue里寫(xiě)數(shù)據(jù), 一個(gè)從Queue里讀數(shù)據(jù)
from multiprocessing import Process, Queue
import os, time, random

# 寫(xiě)數(shù)據(jù)進(jìn)程執(zhí)行代碼
def write(q):
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 讀數(shù)據(jù)進(jìn)程執(zhí)行代碼
def read(q):
    while True:
        if not q.empty():
            value = q.get(True)
            print('Get %s from queue.' % value)
            time.sleep(random.random())
        else:
            break


if __name__=='__main__':
    # 父進(jìn)程創(chuàng)建Queue,并傳給各子進(jìn)程
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啟動(dòng)子進(jìn)程pw, 寫(xiě)入:
    pw.start()
    # 等待pw結(jié)束
    pw.join()
    # 啟動(dòng)子進(jìn)程pr, 讀取:
    pr.start()
    pr.join()
    # 所有子進(jìn)程執(zhí)行結(jié)束,輸出
    print('所有數(shù)據(jù)都寫(xiě)入并讀完')

1.8.1 進(jìn)程間通信(Pipe)

# 示例:
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([33, None, 'hello'])
    conn.close()

if __name__ = '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target = f, args = (child_conn, ))
    p.start()
    print(parent_conn.recv())
    p.join()

1.9 進(jìn)程池中的Queue

在進(jìn)程池中,需要使用 multiprocessing.Manager()中的 Queue();而不是 multiprocessing.Queue()

# 示例:
from multiprocessing import Manager, Pool
import os, time, random


# 寫(xiě)數(shù)據(jù)進(jìn)程執(zhí)行代碼
def reader(q):
    print("reader啟動(dòng)(%s), 父進(jìn)程為(%s)" % (os.getpid(), os.getppid()))
    for i in range(q.qsize()):
        print("reader從Queue獲取到消息: %s" %q.get(True))

# 讀數(shù)據(jù)進(jìn)程執(zhí)行代碼:
def writer(q):
    print("writer啟動(dòng)(%s), 父進(jìn)程為(%s)" % (os.getpid(), os.getppid()))
    for i in ['A', 'B', 'C']:
        q.put(i)

if __name__=="__main__":
    print("(%s) start" % os.getpid())
    q=Manager().Queue()     # 使用Manager中的Queue來(lái)初始化
    po=Pool()
    # 使用阻塞模式創(chuàng)建進(jìn)程,這樣就不需要在reader中使用死循環(huán)了, 可以讓writer執(zhí)行完成后,再用reader讀取
    po.apply(writer,(q,))
    po.apply(reader,(q,))
    po.close()
    po.join()
    print("(%s) End" % os.getpid())

參考資料:

Python 核心編程

總結(jié)

以上是生活随笔為你收集整理的Python 进程(process)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。