我构建了一个脚本,用于在循环中查找当前索引处最近的 8 个索引;本质上是一个移动窗口算法。并行组件需要跨多个二维数组执行此操作,这些数组的总量可能不同,但始终具有相同的维度(比如 2800i x 1200j)。我在下面用 12 测试了这个脚本,所有数组的数据类型都是 Float32,最大小数精度为 8。
首先,让我们从脚本的最近邻部分开始,如下:
import numpy as np
import multiprocessing as mpr
def get_neighbors(arr, origin, num_neighbors = 8):
coords = np.array([[i,j] for (i,j),value in np.ndenumerate(arr)]).reshape(arr.shape + (2,))
distances = np.linalg.norm(coords - origin, axis = -1)
neighbor_limit = np.sort(distances.ravel())[num_neighbors]
window = np.where(distances <= neighbor_limit)
exclude_window = np.where(distances > neighbor_limit)
return window, exclude_window, distances
我已经构建了一个静态数组(名为
gridranger
)来处理移动窗口索引循环,并用于为所有其他二维数组提供窗口索引坐标,前提是所有其他数组的大小都相同。
该脚本的目标是将所有数组中移动窗口中索引处的所有值提取到一个列表中,然后执行一些分析。这部分看起来如下;请注意,变量 grids
包含映射到每个相应二维数组的所有变量的名称:
def extractor(queue, gridin, windowin):
extract_values = []
for i in range(0, len(windowin[0])):
extract_values.append(gridin[windowin[0][i], windowin[1][i]])
queue.put(extract_values)
def parallel():
for index, val in np.ndenumerate(gridranger):
window, exclude, distances = get_neighbors(gridranger, [index[0], index[1]])
outarr = np.column_stack((window[0], window[1]))
outvalues, processes = [], []
q = mpr.Queue()
for grid in grids:
pro = mpr.Process(target=extractor, args=(q, grid, window))
processes.extend([pro])
pro.start()
for p in processes:
extract_values = q.get()
outvalues.append(extract_values)
for p in processes:
p.join()
# return outvalues
print(index, outvalues)
我遇到的问题是使用多进程运行它的时间长度,平均为 ~7.5-8.5 秒。这对于像我运行这个移动窗口的大型二维阵列来说显然是完全低效的。我可以采取哪些步骤来大幅减少此运行时间?
我不能说并行化,但是你的
get_neighbors
函数正在做大量不必要的工作。每次创建新坐标数组时,对距原点的距离进行完整排序,并针对包含和排除索引遍历数组两次。以下函数仍然重复一些工作——例如,可以将对 ogrid 的调用移到函数之外——但它会将运行时间缩短大约 100
def get_neighbors_vectorized(arr, origin, num_neighbors = 8):
n, m = arr.shape
# make the coordinates as a (1,n) shaped array and an (m,) shaped array
xcoord, ycoord = np.ogrid[0:n, 0:m]
# the squared distances from the origin are broadcast together by the +
distances = np.sqrt((xcoord - origin[0])**2 + (ycoord - origin[1])**2)
# argpartition returns an array where arr[1...k] are the indices of the first k sorted values. This replaces the sort and where calls in your original function
partition = np.argpartition(distances, num_neighbors, axis=None)
# convert from the raveled array indices back to 2d
window = (partition[:num_neighbors] // m, partition[:num_neighbors] % m)
# only the window itself seems to be used in the subsequent functions
return window
ipython3 中的基准测试
In [2]: arr = np.arange(20000).reshape(200, 100)
In [3]: %timeit get_neighbors(arr, [12, 4])
7.36 ms ± 7.14 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
In [4]: %timeit get_neighbors_vectorized(arr, [12, 4])
89.2 µs ± 64.1 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
我不知道例程的最终目的是什么,但看起来你只是在做某种移动窗口,这意味着你可以完全替换
get_neighbors
。