im在我打印输出时从不出现在python线程中,这似乎是并行运行的。此外,我的函数所花的时间与使用库并发(ThreadPoolExecutor)之前的时间相同。我必须计算数据集上某些属性的收益(我不能使用库)。由于我大约有1024个属性,并且该函数花了大约一分钟的时间执行(并且必须在迭代中使用它),我竭力将attributes
数组拆分为10个(仅作为示例)并运行Separete每个子数组分别具有函数gain(attribute)
。因此,我执行了以下操作(避免了一些不必要的代码):
def calculate_gains(self):
splited_attributes = np.array_split(self.attributes, 10)
result = {}
for atts in splited_attributes:
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(self.calculate_gains_helper, atts)
return_value = future.result()
self.gains = {**self.gains, **return_value}
这里是calculate_gains_helper:
def calculate_gains_helper(self, attributes):
inter_result = {}
for attribute in attributes:
inter_result[attribute] = self.gain(attribute)
return inter_result
我做错什么了吗?我读了其他一些较旧的文章,但没有任何信息。非常感谢您的帮助!
由于GIL,Python线程无法并行运行(至少在CPython实现中)。使用进程和ProcessPoolExecutor真正具有并行性
with concurrent.futures.ProcessPoolExecutor() as executor:
...
您提交,然后依次等待每个工作项,因此所有线程所做的事情都会减慢一切。我不能保证这会大大加快速度,因为您仍在处理python GIL,该GIL使python级别的内容无法并行工作,但是这里行了。
我创建了一个线程池,并将所有可能的东西都推送到了worker中,包括self.attributes
的切片。
def calculate_gains(self):
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
result_list = executor.map(self.calculate_gains_helper,
(i, i+10) for i in range(0, len(self.attributes), 10))
for return_value in result_list:
self.gains = {**self.gains, **return_value}
def calculate_gains_helper(self, start_end):
start, end = start_end
inter_result = {}
for attribute in self.attributes[start:end]:
inter_result[attribute] = self.gain(attribute)
return inter_result