通过使用并发.futures.ProcessPoolExecutor 的以下程序,我试图使工作进程能够在全局范围内修改 var 字典。然后我希望能够使用修改后的 var 字典。
from time import sleep
import concurrent.futures as cf
tasks = ["aaa", "bbb", "ccc"]
def my_worker(scan_task):
global var
print(scan_task)
var[scan_task] = 0
print(var)
sleep(1)
var = {}
if __name__ == '__main__':
with cf.ProcessPoolExecutor() as executor:
futures = [executor.submit(my_worker, scan_task) for scan_task in tasks]
cf.wait(futures)
print("finished")
print(var)
工作进程能够正确修改它。然而,在进程完成其部分之后,当我尝试打印 var 时,我收到以下输出,var 是一个空字典:
aaa
{'aaa': 0}
bbb
{'aaa': 0, 'bbb': 0}
ccc
{'aaa': 0, 'bbb': 0, 'ccc': 0}
finished
{}
工人完成工作后,如何在主函数中使用修改后的 var 字典?
以下是预期的输出:
aaa
{'aaa': 0}
bbb
{'aaa': 0, 'bbb': 0}
ccc
{'aaa': 0, 'bbb': 0, 'ccc': 0}
finished
{'aaa': 0, 'bbb': 0, 'ccc': 0}
除非您设置了共享内存,否则进程中的内存更改不会被其父进程看到。您无法直接在共享内存中实例化Python对象,因此需要将它们序列化并传输到父级以获得结果。
对于示例中显示的工作流程,最好使用
map
来完成。 map
将数据发送到工作函数并检索函数的结果。更改您的工作人员以返回值而不是尝试更新var
。由于您想要生成任务价值图,因此 map
可以与 zip
一起使用来构建结果 dict
。
from time import sleep
import concurrent.futures as cf
tasks = ["aaa", "bbb", "ccc"]
def my_worker(scan_task):
print("working on", scan_task)
sleep(1)
return scan_task
var = {}
if __name__ == '__main__':
with cf.ProcessPoolExecutor() as executor:
var = dict(zip(tasks, executor.map(my_worker, tasks)))
print(var)