我面临一个非常持久的问题:我想在进程之间使用multiprocessing
中提出的进程可共享类型(例如dict
,list
等,它们由SyncManager
处理代理)共享复杂对象。我甚至实现了一些其他常见的类型,如deque
和set
。只要我在这些对象(浮点数,整数等)中存储简单值,一切都可以正常工作。
例如,如果我使用以下内容,这将按预期完美地工作:
import multiprocessing, time
manager = multiprocessing.Manager()
d = manager.list()
lock = manager.Lock()
def reader(d, lock):
for i in range(5):
with lock:
print(d)
sys.stdout.flush()
time.sleep(0.5)
def writer(d, lock):
for i in range(5):
with lock:
d.append(i)
time.sleep(0.5)
# Try to read and write the deque:
r = multiprocessing.Process(target=reader, args=(d, lock))
w = multiprocessing.Process(target=writer, args=(d, lock))
r.start()
w.start()
r.join()
w.join()
正如预期的那样,这会在阅读器的每次迭代中输出更新的列表。
当我想在这些对象中存储对象时出现问题。让我们说一个列表字典。显然,如果我将简单对象存储在可共享流程的dict中,那就不会这样做。所以我尝试了以下方法:
import multiprocessing, time
manager = multiprocessing.Manager()
d = manager.dict()
lock = manager.Lock()
# Add lists:
for i in range(5):
d[i] = manager.list()
def reader(d, lock):
for i in range(10):
with lock:
print(d)
sys.stdout.flush()
time.sleep(1)
def writer(d, lock):
for i in range(10):
with lock:
for j in range(5):
d[j].append(i)
time.sleep(1)
# Try to read and write the dict:
r = multiprocessing.Process(target=reader, args=(d, lock))
w = multiprocessing.Process(target=writer, args=(d, lock))
r.start()
w.start()
r.join()
w.join()
不幸的是,使用最后一段代码,更新的列表不会在进程之间共享。我认为使用代理可以做到,但事实并非如此。
我发现的唯一解决方案是通过明确地重新分配字典中的密钥来提交对列表所做的更改。显然,如果我这样做,我不再需要在列表上使用代理,因为它有不必要的成本。所以到目前为止我找到了一个解决方案来取代
d[j].append(i)
在作者:
l = d[j] # Somehow creates a local copy
l.append(i) # Modify the copy
d[j] = l # Reassign to commit the change
虽然这可以完成这项工作,但写入/理解并不容易,并且可能容易出错(忘记提交内容等)。
我在这里错过了什么吗?有没有办法在共享对象中获取共享对象?
仅仅为了完整起见,这里是我最终的代码,它完成了我的工作,但不是我想做的事情:
import multiprocessing, time
manager = multiprocessing.Manager()
d = manager.dict()
lock = manager.Lock()
# Add lists:
for i in range(5):
d[i] = [] # manager.list() doesn't change anything
def reader(d, lock):
for i in range(10):
with lock:
print(d)
sys.stdout.flush()
time.sleep(1)
def writer(d, lock):
for i in range(10):
with lock:
for j in range(5):
l = d[j]
l.append(i)
d[j] = l
time.sleep(1)
# Try to read and write the dict:
r = multiprocessing.Process(target=reader, args=(d, lock))
w = multiprocessing.Process(target=writer, args=(d, lock))
r.start()
w.start()
r.join()
w.join()
确实不可能直接执行此操作,因为代理对象无法更改内部变量,因为它只是指向内存位置,并且在使用时将引用存储到该内存位置而不是实际值自定义或更高级的对象。
该文档有以下说明:
注意
对dict和列表代理中的可变值或项的修改将不会通过管理器传播,因为代理无法知道何时修改其值或项。要修改此类项,可以将修改后的对象重新分配给容器代理:
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# reassigning the dictionary, the proxy is notified of the change
lproxy[0] = d
Python 3.8有共享列表。它们不是全功能的(一方面,你不能改变它们的长度),但它们具有列表的其他特征,包括支持各种类型。见ShareableList class。