如何并行化Python嵌套循环?

问题描述 投票:0回答:2

我正在计算这个使用 Scipy 计算 wasserstein_distance 的函数。问题是数据[行][列]可能非常大(从大约 60k 值到 300k),并且需要花费大量时间来详细说明。有没有一种方法可以并行化它,以便为机器的每个处理器“分配”特定范围的元素来详细说明^即:第一个处理器可以详细说明前 24 个像素 (0,0) 和所有其他像素之间的距离 ( 0,0) 到 (23,23),第二个处理器计算接下来的 24 个像素(从 (1,0) 到 (1,23))与所有其他像素 (0,0) 到 (23,23) 之间的距离。欢迎任何其他建议:

def compute_wasserstein_distances(tflite, data, rows, columns, output_filename):

    # Initialize an empty dictionary to store wasserstein distances
    dist_similarity = {}
    
    # Iterate through rows and columns of the grid
    for row in tqdm(range(0, rows)):              
        for column in tqdm(range(0, columns), leave= False):
            for r in range(0, rows):
                for c in range(0, columns):
                    # Calculate the Wasserstein distance between the distributions
                    distance = wasserstein_distance(data[row][column], data[r][c])
                    
                    # Store the distance in the dictionary with corresponding indices
                    dist_similarity[((row, column), (r, c))] = distance

    print(f"[INFO] Saving pixel distribution to file ...")
    
    # Save the dist_similarity dictionary to a pickle file
    with open(f'{output_filename}/{tflite}-DistSimilarity.pkl', 'wb') as file:
        pickle.dump(dist_similarity, file)
    
    return dist_similarity 

我尝试这样做,但是这个 for 循环(对于 tqdm(futures) 中的 future)需要太长的时间。

import pickle
import concurrent.futures
from tqdm import tqdm
from scipy.stats import wasserstein_distance

def calculate_distance(row, column, r, c, data):
    return ((row, column), (r, c), wasserstein_distance(data[row][column], data[r][c]))

def compute_wasserstein_distances_parallel(tflite, data, rows, columns, output_filename):
    # Initialize an empty dictionary to store Wasserstein distances
    dist_similarity = {}

    # Create a ThreadPoolExecutor
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = []

        # Iterate through rows and columns of the grid
        for row in tqdm(range(rows)):
            for column in tqdm(range(columns), leave=False):
                for r in range(rows):
                    for c in range(columns):
                        # Submit the task to the executor
                        futures.append(executor.submit(calculate_distance, row, column, r, c, data))

        # Wait for all tasks to complete
        concurrent.futures.wait(futures)

        # Retrieve results from futures
        for future in tqdm(futures):
            indices, distance = future.result()[:2], future.result()[2]
            dist_similarity[indices] = distance

    print(f"[INFO] Saving pixel distribution to file ...")

    # Save the dist_similarity dictionary to a pickle file
    with open(f'{output_filename}/{tflite}-DistSimilarity.pkl', 'wb') as file:
        pickle.dump(dist_similarity, file)

    return dist_similarity

python parallel-processing neural-network
2个回答
0
投票

不久前我也做了类似的事情。就我而言,它是第一个循环中的矩阵乘法,然后是嵌套循环中的反对角平均。

我发现对两个循环都这样做是没有意义的。我所做的是对每个对角线进行并行处理,即对嵌套循环进行并行处理。

您是否考虑过使用并行和延迟的 joblib 来代替线程?


0
投票

使用 ProcessPoolExecutor。对于像这样的 CPU 密集型进程,您需要在不同的 CPU 内核上运行。

ThreadPoolExecutor 在单个处理器上运行。它主要适用于大部分时间都在等待的子流程,例如IO 绑定进程。

© www.soinside.com 2019 - 2024. All rights reserved.