concurrent.futures.ProcessPoolExecutor() python 中的共享变量

问题描述 投票:0回答:2

我想使用python中的concurrent.futures模块来并行更新全局变量

事实证明,使用 ThreadPoolExecutor 可以更新我的全局变量,但 CPU 没有充分利用其潜力(始终在 5-10%),速度太慢了

并且 ProcessPoolExecutor 可以使用所有核心(100%),但我的全局变量无法更新,因为它们不共享相同的全局变量

如何在并发.futures 模型中使用 ProcessPoolExecutor 共享我的全局变量。非常感谢您的帮助

python concurrent.futures
2个回答
8
投票

进程看起来不像使用相同内存空间的线程。所以你需要一些特殊的方法来更新变量。

ProcessPoolExecutor
使用
multiprocessing
模块,有两种共享数据的方式,共享内存和服务器进程。第一种使用共享内存映射的方式,服务器进程使用
Manager
对象,就像代理一样保存共享数据。服务器进程更灵活,共享内存更高效。

使用像

ThreadPoolExecutor
这样的服务器进程共享数据,只需将参数传递给你的函数即可。

def running_proxy(mval):
    # consider lock if you need
    return mval.value

def start_executor():
    with multiprocessing.Manager() as manager:
        executor = ProcessPoolExecutor(max_workers=5)
        mval = manager.Value('b', 1)
        futures = [executor.submit(running_proxy, mval) for _ in range(5)]
        results = [x.result() for x in futures]
        executor.shutdown()

但是共享内存方式有一些不同,你需要将共享变量设置为全局。

def running_shared():
    # consider lock if you need
    return sval.value

def set_global(args):
    global sval
    sval = args

def start_executor():
    sval = multiprocessing.Value('b', 1)
    # for 3.7+
    executor = ProcessPoolExecutor(max_workers=5, initializer=set_global, initargs=(sval,))
    # for ~3.6
    # set_global(sval)
    # executor = ProcessPoolExecutor(max_workers=5)
    futures = [executor.submit(running_shared) for _ in range(5)]
    results = [x.result() for x in futures]
    executor.shutdown()

0
投票

要共享全局变量,您必须使用Queue或上下文管理器。最简单的方法是使用列表或字典来存储

ProcessPoolExecutor

的结果
import concurrent.futures

# Some function
def do_something(arg):
    output = arg*10
    return output

# List to iterate
sample_list = [5, 4, 6, 2, 3]

# Creating a dictionary to store the results
with concurrent.futures.ProcessPoolExecutor() as executor:
     results = {i:executor.submit(do_something, sample_list[i]) for i in range(len(sample_list))}

# The results are stored in "results" dict
# It can be accessed as a normal dict

for key, values in results.items():
    print(values.result())

您也可以使用

map
代替
submit
。这也确保了函数的输出序列与输入相对应!

with concurrent.futures.ProcessPoolExecutor() as executor:
    results = executor.map(do_something, sample_list)

for result in results:
    print(result)
© www.soinside.com 2019 - 2024. All rights reserved.