线程未使用ThreadPoolExecutor在并行python中执行

问题描述 投票:1回答:2

im在我打印输出时从不出现在python线程中,这似乎是并行运行的。此外,我的函数所花的时间与使用库并发(ThreadPoolExecutor)之前的时间相同。我必须计算数据集上某些属性的收益(我不能使用库)。由于我大约有1024个属性,并且该函数花了大约一分钟的时间执行(并且必须在迭代中使用它),我竭力将attributes数组拆分为10个(仅作为示例)并运行Separete每个子数组分别具有函数gain(attribute)。因此,我执行了以下操作(避免了一些不必要的代码):

def calculate_gains(self):
    splited_attributes = np.array_split(self.attributes, 10)
    result = {}
    for atts in splited_attributes:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future = executor.submit(self.calculate_gains_helper, atts)
            return_value = future.result()
            self.gains = {**self.gains, **return_value}

这里是calculate_gains_helper:

def calculate_gains_helper(self, attributes):
    inter_result = {}
    for attribute in attributes:
        inter_result[attribute] = self.gain(attribute)
    return inter_result

我做错什么了吗?我读了其他一些较旧的文章,但没有任何信息。非常感谢您的帮助!

python machine-learning concurrency python-multithreading
2个回答
1
投票

由于GIL,Python线程无法并行运行(至少在CPython实现中)。使用进程和ProcessPoolExecutor真正具有并行性

with concurrent.futures.ProcessPoolExecutor() as executor:
    ...

0
投票

您提交,然后依次等待每个工作项,因此所有线程所做的事情都会减慢一切。我不能保证这会大大加快速度,因为您仍在处理python GIL,该GIL使python级别的内容无法并行工作,但是这里行了。

我创建了一个线程池,并将所有可能的东西都推送到了worker中,包括self.attributes的切片。

def calculate_gains(self):
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        result_list = executor.map(self.calculate_gains_helper,
            (i, i+10) for i in range(0, len(self.attributes), 10))
    for return_value in result_list:
        self.gains = {**self.gains, **return_value}

def calculate_gains_helper(self, start_end):
    start, end = start_end
    inter_result = {}
    for attribute in self.attributes[start:end]:
        inter_result[attribute] = self.gain(attribute)
    return inter_result
© www.soinside.com 2019 - 2024. All rights reserved.