日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

Python 学习笔记 多进程 multiprocessing

發布時間:2023/12/2 python 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python 学习笔记 多进程 multiprocessing 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Python 解釋器有一個全局解釋器鎖(PIL),導致每個 Python 進程中最多同時運行一個線程,因此 Python 多線程程序并不能改善程序性能,不能發揮多核系統的優勢,可以通過這篇文章了解。

但是多進程程序不受此影響, Python 2.6 引入了 multiprocessing 來解決這個問題。這里介紹 multiprocessing 模塊下的進程,進程同步,進程間通信和進程管理四個方面的內容。 這里主要講解多進程的典型使用,multiprocessing 的 API 幾乎是完復制了 threading 的API, 因此只需花少量的時間就可以熟悉 threading 編程了。

Process

先來看一段代碼

1234567891011 from multiprocessing import Process, current_processdef func():time.sleep(1)proc = current_process()proc.name, proc.pidsub_proc = Process(target=func, args=())sub_proc.start()sub_proc.join()proc = current_process()proc.name, proc.pid

這是在主進程中創建子進程,然后啟動(start) 子進程,等待(join) 子進程執行完,再繼續執行主進程的整個的執行流程。

那么,一個進程應該是用來做什么的,它應該保存一些什么狀態,它的生命周期是什么樣的呢?

一個進程需要處理一些不同任務,或者處理不同的對象。創建進程需要一個 function 和相關參數,參數可以是dictProcess(target=func, args=(), kwargs = {}),name?可以用來標識進程。

控制子進程進入不同階段的是?start(),?join(),?is_alive(),?terminate(),?exitcode?方法。這些方法只能在創建子進程的進程中執行。

進程同步

Lock

鎖是為了確保數據一致性,比如讀寫鎖,每個進程給一個變量增加 1 ,但是如果在一個進程讀取但還沒有寫入的時候,另外的進程也同時讀取了,并寫入該值,則最后寫入的值是錯誤的,這時候就需要鎖。

123456789 def func(lock):lock.acquire() # do mysql query select update ...lock.release() lock = Lock()for i in xrange(4):proc = Process(target=func, args=(lock))proc.start()

Lock 同時也實現了 ContextManager API, 可以結合 with 語句使用, 關于 ContextManager, 請移步?Python 學習實踐筆記 裝飾器 與 context?查看。

Semaphore

Semaphore 和 Lock 稍有不同,Semaphore 相當于 N 把鎖,獲取其中一把就可以執行了。 信號量的總數 N 在構造時傳入,s = Semaphore(N)。 和 Lock 一樣,如果信號量為0,則進程堵塞,直到信號大于0。

Pipes

Pipe 是在兩個進程之間通信的工具,Pipe Constructor 會返回兩個端

1 conn1, conn2 = Pipe(True)

如果是全雙工的(構造函數參數為True),則雙端口都可接收發送,否則前面的端口用于接收,后面的端口用于發送。

1234567891011 def proc1(pipe): for i in xrange(10000):pipe.send(i)def proc2(pipe): while True: print "proc2 rev:", pipe.recv()pipe = Pipe()Process(target=proc1, args=(pipe[0],)).start()Process(target=proc2, args=(pipe[1],)).start()

Pipe 的每個端口同時最多一個進程讀寫,否則數據會出各種問題

Queues

multiprocessing.Queue 與 Queue.Queue 非常相似。其 API 列表如下

  • qsize()
  • empty()
  • full()
  • put()
  • put_nowait()
  • get()
  • get_nowait()
  • close()
  • join_thread()
  • cancel_join_thread()

當 Queue 為 Queue.Full 狀態時,再 put() 會堵塞,當狀態為 Queue.Empty 時,再 get() 也是。當 put() 或 get() 設置了超時參數,而超時的時候,會拋出異常。

Queue 主要用于多個進程產生和消費,一般使用情況如下

123456789101112 def producer(q): for i in xrange(10):q.put(i)def consumer(q): while True: print "consumer", q.get()q = Queue(40)for i in xrange(10):Process(target=producer, args=(q,)).start()Process(target=consumer, args=(q,)).start()

十個生產者進程,一個消費者進程,共用同一個隊列進行同步。

有一個簡化版本的 multiprocessing.queues.SimpleQueue, 只支持3個方法 empty(), get(), put()。

也有一個強化版本的 JoinableQueue, 新增兩個方法 task_done() 和 join()。 task_done() 是給消費者使用的,每完成隊列中的一個任務,調用一次該方法。當所有的 tasks 都完成之后,交給調用 join() 的進程執行。

123456789101112131415 def consumer(q): while True: print "consumer", q.get()q.task_done()jobs = JoinableQueue()for i in xrange(10):jobs.put(i)for i in xrange(10):p = Process(target=consumer, args=(jobs,))p.daemon = Truep.start()jobs.join()

這個 join 函數等待 JoinableQueue 為空的時候,等待就結束,外面的進程可以繼續執行了,但是那10個進程干嘛去了呢,他們還在等待呀,上面是設置了?p.daemon = True, 子進程才隨著主進程結束的,如果沒有設置,它們還是會一直等待的呢。

Lock、Pipe、Queue 和 Pipe 需要注意的是:盡量避免使用 Process.terminate 來終止程序,否則將會導致很多問題, 詳情請移步python 官方文檔查看。

進程間數據共享

前一節中, Pipe、Queue 都有一定數據共享的功能,但是他們會堵塞進程, 這里介紹的兩種數據共享方式都不會堵塞進程, 而且都是多進程安全的。

共享內存

共享內存有兩個結構,一個是?Value, 一個是?Array,這兩個結構內部都實現了鎖機制,因此是多進程安全的。 用法如下:

1234567891011 def func(n, a):n.value = 50 for i in range(len(a)):a[i] += 10num = Value('d', 0.0)ints= Array('i', range(10))p = Process(target=func, args=(num, ints))p.start()p.join()

Value 和 Array 都需要設置其中存放值的類型,d 是 double 類型,i 是 int 類型,具體的對應關系在Python 標準庫的 sharedctypes 模塊中查看。

服務進程 Manager

上面的共享內存支持兩種結構 Value 和 Array, 這些值在主進程中管理,很分散。 Python 中還有一統天下,無所不能的 Server process,專門用來做數據共享。 其支持的類型非常多,比如list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value 和 Array 用法如下:

123456789101112131415 from multiprocessing import Process, Managerdef func(dct, lst):dct[88] = 88lst.reverse()manager = Manager()dct = manager.dict()lst = manager.list(range(5,10))p = Process(target=func, args=(dct, lst))p.start()p.join()print dct, '|', lstOut: {88: 88} | [9, 8, 7, 6, 5]

一個 Manager 對象是一個服務進程,推薦多進程程序中,數據共享就用一個 manager 管理。

進程管理

如果有50個任務要執行, 但是 CPU 只有4核, 你可以創建50個進程來做這個事情。但是大可不必,徒增管理開銷。如果你只想創建4個進程,讓他們輪流替你完成任務,不用自己去管理具體的進程的創建銷毀,那 Pool 是非常有用的。

Pool 是進程池,進程池能夠管理一定的進程,當有空閑進程時,則利用空閑進程完成任務,直到所有任務完成為止,用法如下

12345 def func(x): return x*xpool = Pool(processes=4)print pool.map(func, range(8))

Pool 進程池創建4個進程,不管有沒有任務,都一直在進程池中等候,等到有數據的時候就開始執行。
Pool 的 API 列表如下:

  • apply(func[, args[, kwds]])
  • apply_async(func[, args[, kwds[, callback]]])
  • map(func, iterable[, chunksize])
  • map_async(func, iterable[, chunksize[, callback]])
  • imap(func, iterable[, chunksize])
  • imap_unordered(func, iterable[, chunksize])
  • close()
  • terminate()
  • join()

異步執行

apply_async 和 map_async 執行之后立即返回,然后異步返回結果。 使用方法如下

123456789101112 def func(x): return x*xdef callback(x): print x, 'in callback' pool = Pool(processes=4)result = pool.map_async(func, range(8), 8, callback)print result.get(), 'in main'Out:[0, 1, 4, 9, 16, 25, 36, 49] in callback[0, 1, 4, 9, 16, 25, 36, 49] in main

有兩個值得提到的,一個是 callback,另外一個是 multiprocessing.pool.AsyncResult。 callback 是在結果返回之前,調用的一個函數,這個函數必須只有一個參數,它會首先接收到結果。callback 不能有耗時操作,因為它會阻塞主線程。

AsyncResult 是獲取結果的對象,其 API 如下

  • get([timeout])
  • wait([timeout])
  • ready()
  • successful()

如果設置了 timeout 時間,超時會拋出 multiprocessing.TimeoutError 異常。wait 是等待執行完成。 ready 測試是否已經完成,successful 是在確定已經 ready 的情況下,如果執行中沒有拋出異常,則成功,如果沒有ready 就調用該函數,會得到一個 AssertionError 異常。

Pool 管理

這里不再繼續講 map 的各種變體了,因為從上面的 API 一看便知。

然后我們來看看 Pool 的執行流程,有三個階段。第一、一個進程池接收很多任務,然后分開執行任務;第二、不再接收任務了;第三、等所有任務完成了,回家,不干了。

這就是上面的方法,close 停止接收新的任務,如果還有任務來,就會拋出異常。 join 是等待所有任務完成。 join 必須要在 close 之后調用,否則會拋出異常。terminate 非正常終止,內存不夠用時,垃圾回收器調用的就是這個方法。

#python

總結

以上是生活随笔為你收集整理的Python 学习笔记 多进程 multiprocessing的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。