我想使用python中的concurrent.futures模块来并行更新全局变量
事实证明,使用 ThreadPoolExecutor 可以更新我的全局变量,但 CPU 没有充分利用其潜力(始终在 5-10%),速度太慢了
并且 ProcessPoolExecutor 可以使用所有核心(100%),但我的全局变量无法更新,因为它们不共享相同的全局变量
如何在并发.futures 模型中使用 ProcessPoolExecutor 共享我的全局变量。非常感谢您的帮助
进程看起来不像使用相同内存空间的线程。所以你需要一些特殊的方法来更新变量。
ProcessPoolExecutor
使用multiprocessing
模块,有两种共享数据的方式,共享内存和服务器进程。第一种使用共享内存映射的方式,服务器进程使用 Manager
对象,就像代理一样保存共享数据。服务器进程更灵活,共享内存更高效。
使用像
ThreadPoolExecutor
这样的服务器进程共享数据,只需将参数传递给你的函数即可。
def running_proxy(mval):
# consider lock if you need
return mval.value
def start_executor():
with multiprocessing.Manager() as manager:
executor = ProcessPoolExecutor(max_workers=5)
mval = manager.Value('b', 1)
futures = [executor.submit(running_proxy, mval) for _ in range(5)]
results = [x.result() for x in futures]
executor.shutdown()
但是共享内存方式有一些不同,你需要将共享变量设置为全局。
def running_shared():
# consider lock if you need
return sval.value
def set_global(args):
global sval
sval = args
def start_executor():
sval = multiprocessing.Value('b', 1)
# for 3.7+
executor = ProcessPoolExecutor(max_workers=5, initializer=set_global, initargs=(sval,))
# for ~3.6
# set_global(sval)
# executor = ProcessPoolExecutor(max_workers=5)
futures = [executor.submit(running_shared) for _ in range(5)]
results = [x.result() for x in futures]
executor.shutdown()
要共享全局变量,您必须使用Queue或上下文管理器。最简单的方法是使用列表或字典来存储
ProcessPoolExecutor
! 的结果
import concurrent.futures
# Some function
def do_something(arg):
output = arg*10
return output
# List to iterate
sample_list = [5, 4, 6, 2, 3]
# Creating a dictionary to store the results
with concurrent.futures.ProcessPoolExecutor() as executor:
results = {i:executor.submit(do_something, sample_list[i]) for i in range(len(sample_list))}
# The results are stored in "results" dict
# It can be accessed as a normal dict
for key, values in results.items():
print(values.result())
您也可以使用
map
代替 submit
。这也确保了函数的输出序列与输入相对应!
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(do_something, sample_list)
for result in results:
print(result)