日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

python 多进程共享变量manager_python 进程间共享数据 multiprocessing 通信问题 — Manager...

發布時間:2025/3/20 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python 多进程共享变量manager_python 进程间共享数据 multiprocessing 通信问题 — Manager... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Python中進程間共享數據,處理基本的queue,pipe和value+array外,還提供了更高層次的封裝。使用multiprocessing.Manager可以簡單地使用這些高級接口。

Manager()返回的manager對象控制了一個server進程,此進程包含的python對象可以被其他的進程通過proxies來訪問。從而達到多進程間數據通信且安全。

Manager支持的類型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。

Manager的dict,list使用

import multiprocessing

import time

def worker(d, key, value):

d[key] = value

if __name__ == '__main__':

mgr = multiprocessing.Manager()

d = mgr.dict()

jobs = [ multiprocessing.Process(target=worker, args=(d, i, i*2))

for i in range(10)

]

for j in jobs:

j.start()

for j in jobs:

j.join()

print ('Results:' )

for key, value in enumerate(dict(d)):

print("%s=%s" % (key, value))

# the output is :

# Results:

# 0=0

# 1=1

# 2=2

# 3=3

# 4=4

# 5=5

# 6=6

# 7=7

# 8=8

# 9=9

import multiprocessing

import time

def worker(d, key, value):

d[key] = value

if __name__ == '__main__':

mgr = multiprocessing.Manager()

d = mgr.dict()

jobs = [ multiprocessing.Process(target=worker, args=(d, i, i*2))

for i in range(10)

]

for j in jobs:

j.start()

for j in jobs:

j.join()

print ('Results:' )

for key, value in enumerate(dict(d)):

print("%s=%s" % (key, value))

# the output is :

# Results:

# 0=0

# 1=1

# 2=2

# 3=3

# 4=4

# 5=5

# 6=6

# 7=7

# 8=8

# 9=9

文章來源:https://www.cnblogs.com/caodneg7/p/9520069.html

Python 多進程默認不能共享全局變量

主進程與子進程是并發執行的,進程之間默認是不能共享全局變量的(子進程不能改變主進程中全局變量的值)。如果要共享全局變量需要用(multiprocessing.Value("d",10.0),數值)(multiprocessing.Array("i",[1,2,3,4,5]),數組)(multiprocessing.Manager().dict(),字典)(multiprocessing.Manager().list(range(5)))。進程通信(進程之間傳遞數據)用進程隊列(multiprocessing.Queue(),單向通信),管道( multiprocessing.Pipe() ,雙向通信)。

這里要注意多線程之間數據共享、多進程之間數據共享。。。。。。。多線程用全局變量(global)

全局變量,主進程與子進程是并發執行的,他們不能共享全局變量(子進程不能改變主進程中全局變量的值)

由于進程之間不共享內存,所以進程之間的通信不能像線程之間直接引用,因而需要采取一些策略來完成進程之間的數據通信。

本文記錄使用 Manager 來完成進程間通信的方式。

首先描述需求:

場景:頂層邏輯負責管理,我們定義為C,由C啟動A、B兩個進程聯合完成功能

需求:A、B聯合工作過程中的數據通信

python 多進程

解決思路:

利用頂層C創建一個 Manager,由 Manager 提供數據池分發給A、B使用,從而完成兩個進程之間的通信。

#-*-encoding:utf-8-*-

from multiprocessing import Process, Manager

from time import sleep

def thread_a_main(sync_data_pool): # A 進程主函數,存入100+的數

for ix in range(100, 105):

sleep(1)

sync_data_pool.append(ix)

def thread_b_main(sync_data_pool): # B 進程主函數,存入300+的數

for ix in range(300, 309):

sleep(0.6)

sync_data_pool.append(ix)

def _test_case_000(): # 測試用例

manager = Manager() # multiprocessing 中的 Manager 是一個工廠方法,直接獲取一個 SyncManager 的實例

sync_data_pool = manager.list() # 利用 SyncManager 的實例來創建同步數據池

Process(target=thread_a_main, args=(sync_data_pool, )).start() # 創建并啟動 A 進程

Process(target=thread_b_main, args=(sync_data_pool, )).start() # 創建并啟動 B 進程

for ix in range(6): # C 進程(主進程)中實時的去查看數據池中的數據

sleep(1)

print(sync_data_pool)

if '__main__' == __name__: # 養成好習慣,將測試用例單獨列出

_test_case_000()

#-*-encoding:utf-8-*-

from multiprocessing import Process, Manager

from time import sleep

def thread_a_main(sync_data_pool): # A 進程主函數,存入100+的數

for ix in range(100, 105):

sleep(1)

sync_data_pool.append(ix)

def thread_b_main(sync_data_pool): # B 進程主函數,存入300+的數

for ix in range(300, 309):

sleep(0.6)

sync_data_pool.append(ix)

def _test_case_000(): # 測試用例

manager = Manager() # multiprocessing 中的 Manager 是一個工廠方法,直接獲取一個 SyncManager 的實例

sync_data_pool = manager.list() # 利用 SyncManager 的實例來創建同步數據池

Process(target=thread_a_main, args=(sync_data_pool, )).start() # 創建并啟動 A 進程

Process(target=thread_b_main, args=(sync_data_pool, )).start() # 創建并啟動 B 進程

for ix in range(6): # C 進程(主進程)中實時的去查看數據池中的數據

sleep(1)

print(sync_data_pool)

if '__main__' == __name__: # 養成好習慣,將測試用例單獨列出

_test_case_000()

輸出結果:

[]

[300, 100, 301]

[300, 100, 301, 101, 302, 303]

[300, 100, 301, 101, 302, 303, 102, 304]

[300, 100, 301, 101, 302, 303, 102, 304, 305, 103, 306]

[300, 100, 301, 101, 302, 303, 102, 304, 305, 103, 306, 104, 307, 308]

從結果來看, 我們的300+和100+的數據并行的被插入到同一數據池中了,也就是說,通信的目的達到了

只是從目前的方式來看,接收方必須主動的去查詢數據池,類似于信箱的方式了

除了注釋中的內容外,還需要注意:

1、manager 要在頂層創建

2、同步數據池是通過Manager自身的工廠方法創建的,這里 manager.list() 調用一次即產生一個新的數據池,而不是返回同一個數據池實例,所以數據池的實例需要做好管理

3、可以用的數據格式不僅list,可以選擇如下(參考multiprocessing.managers)

class SyncManager(BaseManager):

def BoundedSemaphore(self, value: Any = ...) -> threading.BoundedSemaphore: ...

def Condition(self, lock: Any = ...) -> threading.Condition: ...

def Event(self) -> threading.Event: ...

def Lock(self) -> threading.Lock: ...

def Namespace(self) -> _Namespace: ...

def Queue(self, maxsize: int = ...) -> queue.Queue: ...

def RLock(self) -> threading.RLock: ...

def Semaphore(self, value: Any = ...) -> threading.Semaphore: ...

def Array(self, typecode: Any, sequence: Sequence[_T]) -> Sequence[_T]: ...

def Value(self, typecode: Any, value: _T) -> _T: ...

def dict(self, sequence: Mapping[_KT, _VT] = ...) -> Dict[_KT, _VT]: ...

def list(self, sequence: Sequence[_T] = ...) -> List[_T]: ...

進程之間共享數據(數值型):

import multiprocessing

def func(num):

num.value=10.78 #子進程改變數值的值,主進程跟著改變

if __name__=="__main__":

num=multiprocessing.Value("d",10.0) # d表示數值,主進程與子進程共享這個value。(主進程與子進程都是用的同一個value)

print(num.value)

p=multiprocessing.Process(target=func,args=(num,))

p.start()

p.join()

print(num.value)

import multiprocessing

def func(num):

num.value=10.78 #子進程改變數值的值,主進程跟著改變

if __name__=="__main__":

num=multiprocessing.Value("d",10.0) # d表示數值,主進程與子進程共享這個value。(主進程與子進程都是用的同一個value)

print(num.value)

p=multiprocessing.Process(target=func,args=(num,))

p.start()

p.join()

print(num.value)

進程之間共享數據(數組型):

import multiprocessing

def func(num):

num[2]=9999 #子進程改變數組,主進程跟著改變

if __name__=="__main__":

num=multiprocessing.Array("i",[1,2,3,4,5]) #主進程與子進程共享這個數組

print(num[:])

p=multiprocessing.Process(target=func,args=(num,))

p.start()

p.join()

print(num[:])

import multiprocessing

def func(num):

num[2]=9999 #子進程改變數組,主進程跟著改變

if __name__=="__main__":

num=multiprocessing.Array("i",[1,2,3,4,5]) #主進程與子進程共享這個數組

print(num[:])

p=multiprocessing.Process(target=func,args=(num,))

p.start()

p.join()

print(num[:])

進程之間共享數據(dict,list):

import multiprocessing

def func(mydict,mylist):

mydict["index1"]="aaaaaa" #子進程改變dict,主進程跟著改變

mydict["index2"]="bbbbbb"

mylist.append(11) #子進程改變List,主進程跟著改變

mylist.append(22)

mylist.append(33)

if __name__=="__main__":

with multiprocessing.Manager() as MG: #重命名

mydict=multiprocessing.Manager().dict() #主進程與子進程共享這個字典

mylist=multiprocessing.Manager().list(range(5)) #主進程與子進程共享這個List

p=multiprocessing.Process(target=func,args=(mydict,mylist))

p.start()

p.join()

print(mylist)

print(mydict)

import multiprocessing

def func(mydict,mylist):

mydict["index1"]="aaaaaa" #子進程改變dict,主進程跟著改變

mydict["index2"]="bbbbbb"

mylist.append(11) #子進程改變List,主進程跟著改變

mylist.append(22)

mylist.append(33)

if __name__=="__main__":

with multiprocessing.Manager() as MG: #重命名

mydict=multiprocessing.Manager().dict() #主進程與子進程共享這個字典

mylist=multiprocessing.Manager().list(range(5)) #主進程與子進程共享這個List

p=multiprocessing.Process(target=func,args=(mydict,mylist))

p.start()

p.join()

print(mylist)

print(mydict)

-----------------------------------------------------------------

Value、Array是通過共享內存的方式共享數據

Manager是通過共享進程的方式共享數據。

Value\Array

實例代碼:

import multiprocessing

#Value/Array

def func1(a,arr):

a.value=3.14

for i in range(len(arr)):

arr[i]=-arr[i]

if __name__ == '__main__':

num=multiprocessing.Value('d',1.0)#num=0

arr=multiprocessing.Array('i',range(10))#arr=range(10)

p=multiprocessing.Process(target=func1,args=(num,arr))

p.start()

p.join()

print num.value

print arr[:]

import multiprocessing

#Value/Array

def func1(a,arr):

a.value=3.14

for i in range(len(arr)):

arr[i]=-arr[i]

if __name__ == '__main__':

num=multiprocessing.Value('d',1.0)#num=0

arr=multiprocessing.Array('i',range(10))#arr=range(10)

p=multiprocessing.Process(target=func1,args=(num,arr))

p.start()

p.join()

print num.value

print arr[:]

執行結果:

3.14

[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

1

2

3

3.14

[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

1

2

3

Manager管理的共享數據類型有:Value、Array、dict、list、Lock、Semaphore等等,同時Manager還可以共享類的實例對象。

實例代碼:

from multiprocessing import Process,Manager

def func1(shareList,shareValue,shareDict,lock):

with lock:

shareValue.value+=1

shareDict[1]='1'

shareDict[2]='2'

for i in xrange(len(shareList)):

shareList[i]+=1

if __name__ == '__main__':

manager=Manager()

list1=manager.list([1,2,3,4,5])

dict1=manager.dict()

array1=manager.Array('i',range(10))

value1=manager.Value('i',1)

lock=manager.Lock()

proc=[Process(target=func1,args=(list1,value1,dict1,lock)) for i in xrange(20)]

for p in proc:

p.start()

for p in proc:

p.join()

print list1

print dict1

print array1

print value1

from multiprocessing import Process,Manager

def func1(shareList,shareValue,shareDict,lock):

with lock:

shareValue.value+=1

shareDict[1]='1'

shareDict[2]='2'

for i in xrange(len(shareList)):

shareList[i]+=1

if __name__ == '__main__':

manager=Manager()

list1=manager.list([1,2,3,4,5])

dict1=manager.dict()

array1=manager.Array('i',range(10))

value1=manager.Value('i',1)

lock=manager.Lock()

proc=[Process(target=func1,args=(list1,value1,dict1,lock)) for i in xrange(20)]

for p in proc:

p.start()

for p in proc:

p.join()

print list1

print dict1

print array1

print value1

執行結果:

[21, 22, 23, 24, 25]

{1: '1', 2: '2'}

array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

Value('i', 21)

[21, 22, 23, 24, 25]

{1: '1', 2: '2'}

array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

Value('i', 21)

通過Manager進程間共享實例對象:

from multiprocessing import Process,Value,Lock

from multiprocessing.managers import BaseManager

class Employee(object):

def __init__(self,name,salary):

self.name=name

self.salary=Value('i',salary)

def increase(self):

self.salary.value+=100

def getPay(self):

return self.name+':'+str(self.salary.value)

class MyManager(BaseManager):

pass

def Manager2():

m=MyManager()

m.start()

return m

MyManager.register('Employee',Employee)

def func1(em,lock):

with lock:

em.increase()

if __name__ == '__main__':

manager=Manager2()

em=manager.Employee('zhangsan',1000)

lock=Lock()

proces=[Process(target=func1,args=(em,lock))for i in xrange(10)]

for p in proces:

p.start()

for p in proces:

p.join()

print em.getPay()

from multiprocessing import Process,Value,Lock

from multiprocessing.managers import BaseManager

class Employee(object):

def __init__(self,name,salary):

self.name=name

self.salary=Value('i',salary)

def increase(self):

self.salary.value+=100

def getPay(self):

return self.name+':'+str(self.salary.value)

class MyManager(BaseManager):

pass

def Manager2():

m=MyManager()

m.start()

return m

MyManager.register('Employee',Employee)

def func1(em,lock):

with lock:

em.increase()

if __name__ == '__main__':

manager=Manager2()

em=manager.Employee('zhangsan',1000)

lock=Lock()

proces=[Process(target=func1,args=(em,lock))for i in xrange(10)]

for p in proces:

p.start()

for p in proces:

p.join()

print em.getPay()

資料來源:https://blog.csdn.net/houyanhua1/article/details/78236514

https://blog.csdn.net/houyanhua1/article/details/78244288

https://blog.csdn.net/lechunluo3/article/details/79005910

總結

以上是生活随笔為你收集整理的python 多进程共享变量manager_python 进程间共享数据 multiprocessing 通信问题 — Manager...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。