我有两个numpy数组R,尺寸为S x F和W,尺寸为N x M x F。具体而言,让我们分配以下值N = 5
,M = 7
,F = 3
,S = 4
数组R包含具有S = 4
功能的样本F = 3
的集合。每行代表一个样本,每行代表一个特征。因此,R[0]
是第一个样本,R[1]
是第二个样本,然后继续。每个R[i-th]
条目都包含F
元素,为示例R[0] = np.array([1, 4, -2])
而给出。
这里是一小段代码,使用MWE初始化所有这些值
import numpy as np
# Size of Map (rows, columns)
N, M = 5, 7
# Number of features
F = 3
# Sample size
S = 4
np.random.seed(13)
R = np.random.randint(0, 10, size=(S, F))
W = np.random.randint(-4, 5, size=(N, M, F))
我们还可以看到numpy array W的给定“深度线”,作为向量,其维数也与数组R的每一行相同(这很容易注意到两个数组最后一个维度的大小)。这样我就可以访问W[2, 3]
并获得np.array([ 2, 2, -1 ])
(这里的值只是示例)。
我创建了一个简单的函数来计算给定向量r与矩阵W的每个“深度线”]的距离,并返回最近的元素W 到r
的深度线def nearest_vector_matrix_naive(r, W):
delta = np.zeros((N,M), dtype=int)
for i in range(N):
for j in range(M):
norm = 0
for k in range(F):
norm += (r[k] - W[i,j,k])**2
delta[i,j] = norm
norm = 0
win_idx = np.unravel_index(np.argmin(delta, axis=None), delta.shape)
return win_idx
当然,这是一种非常幼稚的方法,我可以进一步优化下面的代码,从而获得巨大的性能提升。
def nearest_vector_matrix(r, W):
delta = np.sum((W[:,:] - r)**2, axis=2)
return np.unravel_index(np.argmin(delta, axis=None), delta.shape)
我可以简单地使用此功能
nearest_idx = nearest_vector_matrix(R[0], W)
# Returns the nearest vector in W to R[0]
W[nearest_idx]
由于我拥有带有一堆样本的数组R,因此我使用以下代码段来计算与样本数组最接近的向量:
def nearest_samples_matrix(R, W):
DELTA = np.zeros((R.shape[0],2))
for idx, r in enumerate(R):
delta = np.sum((W[:,:] - r)**2, axis=2)
DELTA[idx] = np.unravel_index(np.argmin(delta, axis=None), delta.shape)
return DELTA
此函数返回一个数组,其中包含2个索引的S行(S是样本数)。即DELTA的形状(总是)为(S, 2)
。
[我想知道如何替换for
内部的nearest_samples_matrix
循环(例如,广播)以进一步提高代码执行性能?
我不知道该怎么做。 (除了我能够在第一种情况下做到)
对于较小尺寸的问题,dim <20或更小,通常采用a kdtree approach。关于这个话题有很多答案,例如。 one我几周前已经写过。
如果问题的范围太大,则可以切换到蛮力算法。以下两种算法都比优化方法快得多,但是在较大的输入大小和低维问题上,比kdtree方法用O(log(n))代替O(n ^ 2)慢得多。
蛮力1
下面的示例使用描述为here的算法。由于大多数计算都是在高度优化的矩阵矩阵乘法算法中完成的,因此它在大型问题上的速度非常快。缺点是较高的内存使用率(所有距离都在一个函数调用中计算)和精度问题,因为更容易出错。
import numpy as np
from sklearn.metrics.pairwise import euclidean_distances
def nearest_samples_matrix_2(R,W):
R_Temp=R
W_Temp=W.reshape(-1,W.shape[2])
dist=euclidean_distances(R_Temp, W_Temp)
ind_1,ind_2=np.unravel_index(np.argmin(dist,axis=1),shape=(W.shape[0],W.shape[1]))
return np.vstack((ind_1,ind_2)).T
蛮力2
这与您的幼稚方法非常相似,但是使用JIT编译器(Numba)可获得良好的性能。临时数组不是必需的,并且精度应该很好(只要不发生溢出)。对于较大的输入大小,还有进一步优化的空间(循环拼贴)。
import numpy as np
import numba as nb
#parallelization is only beneficial on larger input data
@nb.njit(fastmath=True,parallel=True,cache=True)
def nearest_samples_matrix_3(r, W):
ind_i=0
ind_j=0
out=np.empty((r.shape[0],2),dtype=np.int64)
for x in nb.prange(r.shape[0]):
delta=0
for k in range(W.shape[2]):
delta += (r[x,k] - W[0,0,k])**2
for i in range(W.shape[0]):
for j in range(W.shape[1]):
norm = 0
for k in range(W.shape[2]):
norm += (r[x,k] - W[i,j,k])**2
if norm < delta:
delta=norm
ind_i=i
ind_j=j
out[x,0]=ind_i
out[x,1]=ind_j
return out
Timings
#small Arrays
N, M = 100, 200
F = 30
S = 50
R = np.random.randint(0, 10, size=(S, F))
W = np.random.randint(-4, 5, size=(N, M, F))
#your function
%timeit nearest_samples_matrix(R,W)
#268 ms ± 2.94 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit nearest_samples_matrix_2(R,W)
#5.62 ms ± 22.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit nearest_samples_matrix_3(R,W)
#3.68 ms ± 1.01 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
#larger arrays
N, M = 1_000, 2_000
F = 50
S = 100
R = np.random.randint(0, 10, size=(S, F))
W = np.random.randint(-4, 5, size=(N, M, F))
#%timeit nearest_samples_matrix_1(R,W)
#too slow
%timeit nearest_samples_matrix_2(R,W)
#2.76 s ± 17.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit nearest_samples_matrix_3(R,W)
#1.42 s ± 402 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)