我正在计算这个使用 Scipy 计算 wasserstein_distance 的函数。问题是数据[行][列]可能非常大(从大约 60k 值到 300k),并且需要花费大量时间来详细说明。有没有一种方法可以并行化它,以便为机器的每个处理器“分配”特定范围的元素来详细说明^即:第一个处理器可以详细说明前 24 个像素 (0,0) 和所有其他像素之间的距离 ( 0,0) 到 (23,23),第二个处理器计算接下来的 24 个像素(从 (1,0) 到 (1,23))与所有其他像素 (0,0) 到 (23,23) 之间的距离。欢迎任何其他建议:
def compute_wasserstein_distances(tflite, data, rows, columns, output_filename):
# Initialize an empty dictionary to store wasserstein distances
dist_similarity = {}
# Iterate through rows and columns of the grid
for row in tqdm(range(0, rows)):
for column in tqdm(range(0, columns), leave= False):
for r in range(0, rows):
for c in range(0, columns):
# Calculate the Wasserstein distance between the distributions
distance = wasserstein_distance(data[row][column], data[r][c])
# Store the distance in the dictionary with corresponding indices
dist_similarity[((row, column), (r, c))] = distance
print(f"[INFO] Saving pixel distribution to file ...")
# Save the dist_similarity dictionary to a pickle file
with open(f'{output_filename}/{tflite}-DistSimilarity.pkl', 'wb') as file:
pickle.dump(dist_similarity, file)
return dist_similarity
我尝试这样做,但是这个 for 循环(对于 tqdm(futures) 中的 future)需要太长的时间。
import pickle
import concurrent.futures
from tqdm import tqdm
from scipy.stats import wasserstein_distance
def calculate_distance(row, column, r, c, data):
return ((row, column), (r, c), wasserstein_distance(data[row][column], data[r][c]))
def compute_wasserstein_distances_parallel(tflite, data, rows, columns, output_filename):
# Initialize an empty dictionary to store Wasserstein distances
dist_similarity = {}
# Create a ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
# Iterate through rows and columns of the grid
for row in tqdm(range(rows)):
for column in tqdm(range(columns), leave=False):
for r in range(rows):
for c in range(columns):
# Submit the task to the executor
futures.append(executor.submit(calculate_distance, row, column, r, c, data))
# Wait for all tasks to complete
concurrent.futures.wait(futures)
# Retrieve results from futures
for future in tqdm(futures):
indices, distance = future.result()[:2], future.result()[2]
dist_similarity[indices] = distance
print(f"[INFO] Saving pixel distribution to file ...")
# Save the dist_similarity dictionary to a pickle file
with open(f'{output_filename}/{tflite}-DistSimilarity.pkl', 'wb') as file:
pickle.dump(dist_similarity, file)
return dist_similarity
不久前我也做了类似的事情。就我而言,它是第一个循环中的矩阵乘法,然后是嵌套循环中的反对角平均。
我发现对两个循环都这样做是没有意义的。我所做的是对每个对角线进行并行处理,即对嵌套循环进行并行处理。
您是否考虑过使用并行和延迟的 joblib 来代替线程?
使用 ProcessPoolExecutor。对于像这样的 CPU 密集型进程,您需要在不同的 CPU 内核上运行。
ThreadPoolExecutor 在单个处理器上运行。它主要适用于大部分时间都在等待的子流程,例如IO 绑定进程。