我需要计算两组向量之间的成对距离,但我只需要知道这些成对距离的一小部分。当我的两组向量足够大时,这个比例将小于 0.01。
这是我当前的解决方案和一个小例子。
A
和B
是两组向量,M
存储我需要保留的向量对。 A
和 B
中的行数通常会大得多(几千)。还值得一提的是,M
每行至少有一个非零值,但某些列可能只包含零。
import numpy as np
A = np.array([[1, 2], [2, 3], [3, 4]]) # (3, 2)
B = np.array([[4, 5], [5, 6], [6, 7], [7, 8], [8, 9]]) # (5, 2)
M = np.array([[0, 0, 0, 1, 0], [1, 1, 0, 0, 0], [0, 0, 0, 0, 1]]) # (3, 5)
diff = A[:,None] - B[None,:] # (3, 5, 2)
distances = np.linalg.norm(diff, ord=2, axis=2) # (3, 5)
masked_distances = distances * M # (3, 5)
这段代码可以工作,但它计算了很多我不需要的规范,因此它执行了很多不必要的计算,特别是当
A
和 B
变得更大时。有没有更有效的方法来只计算我需要的规范?我愿意接受其他软件包的建议。
我曾考虑过使用稀疏数组来计算
masked_distances
,但是我在 scipy 中超过 2 维的稀疏数组(即 diff
)中遇到问题。
我还尝试创建矢量化函数。这也有效,但当
A
和 B
增长到拥有数千条记录时,速度会更慢。
def conditional_diff(a, b, m):
if m:
return a-b
else:
return 0
conditional_diff_vec = np.vectorize(conditional_diff, otypes=[float])
masked_diff = conditional_diff_vec(A[:,None], B[None,:], M[:,:,None])
masked_distances = norm(masked_diff, ord=2, axis=2)
我将通过使用 numba 构造一个“压缩稀疏行矩阵”来解决这个问题。 CSR 矩阵使我能够避免计算未使用的距离,并避免使用内存来表示矩阵中的许多零。 Numba 允许我构建显式循环,而不会损失太多性能。
import numba as nb
import numpy as np
import scipy
import math
A_big = np.random.rand(2000, 10)
B_big = np.random.rand(4000, 10)
M_big = np.random.rand(A_big.shape[0], B_big.shape[0]) < 0.001
@nb.njit()
def euclidean_distance(vec_a, vec_b):
acc = 0.0
for i in range(vec_a.shape[0]):
acc += (vec_a[i] - vec_b[i]) ** 2
return math.sqrt(acc)
@nb.njit()
def masked_distance_inner(data, indicies, indptr, matrix_a, matrix_b, mask):
write_pos = 0
N, M = matrix_a.shape[0], matrix_b.shape[0]
for i in range(N):
for j in range(M):
if mask[i, j]:
# Record distance value
data[write_pos] = euclidean_distance(matrix_a[i], matrix_b[j])
# Record what column this distance value should be in
indicies[write_pos] = j
write_pos += 1
# Record how many entries are in this row
indptr[i + 1] = write_pos
# Assert that we wrote to every part of un-initialized memory
assert write_pos == data.shape[0]
assert write_pos == indicies.shape[0]
# Data is returned by modifying parameters to function
def masked_distance(matrix_a, matrix_b, mask):
N, M = matrix_a.shape[0], matrix_b.shape[0]
assert mask.shape == (N, M)
# Convert mask to bool
mask = mask != 0
# How many entries will this sparse array have?
sparse_length = mask.sum()
# Set up CSR matrix
# data and indicies do not need to be zero-initialized,
# and it is faster not to initialize them
# Holds data values
data = np.empty(sparse_length, dtype='float64')
# Holds column that each data value is in
indicies = np.empty(sparse_length, dtype='int64')
# Holds the position of the elements of each row within data and indicies
indptr = np.zeros(N + 1, dtype='int64')
masked_distance_inner(data, indicies, indptr, matrix_a, matrix_b, mask)
return scipy.sparse.csr_matrix((data, indicies, indptr), shape=(N, M))
%timeit masked_distance(A_big, B_big, M_big)
一些注意事项:
np.linalg.norm(A[i] - B[i])
更快,并且仍然返回相同的结果。这就是
euclidean_distance()
功能存在的原因。
masked_distance()
负责设置算法。
masked_distance_inner()
都是对速度至关重要的东西。
13.5 ms ± 66.6 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
556 ms ± 3.74 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
确切的加速取决于所涉及矩阵的大小以及掩模的稀疏程度。例如,我使用长度为 1000 的向量对同一问题进行了基准测试,发现加速了 1000 倍。
np.allclose()
检查了该算法与原始算法的正确性。