加速Python中的稀疏交叉差异

问题描述 投票:0回答:1

我需要计算两组向量之间的成对距离,但我只需要知道这些成对距离的一小部分。当我的两组向量足够大时,这个比例将小于 0.01。

这是我当前的解决方案和一个小例子。

A
B
是两组向量,
M
存储我需要保留的向量对。
A
B
中的行数通常会大得多(几千)。还值得一提的是,
M
每行至少有一个非零值,但某些列可能只包含零。

import numpy as np

A = np.array([[1, 2], [2, 3], [3, 4]])                              # (3, 2)
B = np.array([[4, 5], [5, 6], [6, 7], [7, 8], [8, 9]])              # (5, 2)
M = np.array([[0, 0, 0, 1, 0], [1, 1, 0, 0, 0], [0, 0, 0, 0, 1]])   # (3, 5)

diff = A[:,None] - B[None,:]                                        # (3, 5, 2)
distances = np.linalg.norm(diff, ord=2, axis=2)                     # (3, 5)
masked_distances = distances * M                                    # (3, 5)

这段代码可以工作,但它计算了很多我不需要的规范,因此它执行了很多不必要的计算,特别是当

A
B
变得更大时。有没有更有效的方法来只计算我需要的规范?我愿意接受其他软件包的建议。

我曾考虑过使用稀疏数组来计算

masked_distances
,但是我在 scipy 中超过 2 维的稀疏数组(即
diff
)中遇到问题。

我还尝试创建矢量化函数。这也有效,但当

A
B
增长到拥有数千条记录时,速度会更慢。

def conditional_diff(a, b, m):
    if m:
        return a-b
    else:
        return 0

conditional_diff_vec = np.vectorize(conditional_diff, otypes=[float])
masked_diff = conditional_diff_vec(A[:,None], B[None,:], M[:,:,None])
masked_distances = norm(masked_diff, ord=2, axis=2)
python numpy scipy vectorization sparse-matrix
1个回答
0
投票

我将通过使用 numba 构造一个“压缩稀疏行矩阵”来解决这个问题。 CSR 矩阵使我能够避免计算未使用的距离,并避免使用内存来表示矩阵中的许多零。 Numba 允许我构建显式循环,而不会损失太多性能。

import numba as nb import numpy as np import scipy import math A_big = np.random.rand(2000, 10) B_big = np.random.rand(4000, 10) M_big = np.random.rand(A_big.shape[0], B_big.shape[0]) < 0.001 @nb.njit() def euclidean_distance(vec_a, vec_b): acc = 0.0 for i in range(vec_a.shape[0]): acc += (vec_a[i] - vec_b[i]) ** 2 return math.sqrt(acc) @nb.njit() def masked_distance_inner(data, indicies, indptr, matrix_a, matrix_b, mask): write_pos = 0 N, M = matrix_a.shape[0], matrix_b.shape[0] for i in range(N): for j in range(M): if mask[i, j]: # Record distance value data[write_pos] = euclidean_distance(matrix_a[i], matrix_b[j]) # Record what column this distance value should be in indicies[write_pos] = j write_pos += 1 # Record how many entries are in this row indptr[i + 1] = write_pos # Assert that we wrote to every part of un-initialized memory assert write_pos == data.shape[0] assert write_pos == indicies.shape[0] # Data is returned by modifying parameters to function def masked_distance(matrix_a, matrix_b, mask): N, M = matrix_a.shape[0], matrix_b.shape[0] assert mask.shape == (N, M) # Convert mask to bool mask = mask != 0 # How many entries will this sparse array have? sparse_length = mask.sum() # Set up CSR matrix # data and indicies do not need to be zero-initialized, # and it is faster not to initialize them # Holds data values data = np.empty(sparse_length, dtype='float64') # Holds column that each data value is in indicies = np.empty(sparse_length, dtype='int64') # Holds the position of the elements of each row within data and indicies indptr = np.zeros(N + 1, dtype='int64') masked_distance_inner(data, indicies, indptr, matrix_a, matrix_b, mask) return scipy.sparse.csr_matrix((data, indicies, indptr), shape=(N, M)) %timeit masked_distance(A_big, B_big, M_big)

一些注意事项:

    我发现在numba中计算欧氏距离比
  • np.linalg.norm(A[i] - B[i])

    更快,并且仍然返回相同的结果。这就是

    euclidean_distance()
    功能存在的原因。
    
    

  • masked_distance()

    负责设置算法。

    masked_distance_inner()
    都是对速度至关重要的东西。
    
    

  • 我将其与问题中的算法进行了基准测试,发现对于形状为 (2000, 10) 和 B (4000, 10) 的 A 输入,其中 M 有 0.1% 的元素设置为 True,速度大约快 40 倍.
  • 13.5 ms ± 66.6 µs per loop (mean ± std. dev. of 7 runs, 1 loop each) 556 ms ± 3.74 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

    确切的加速取决于所涉及矩阵的大小以及掩模的稀疏程度。例如,我使用长度为 1000 的向量对同一问题进行了基准测试,发现加速了 1000 倍。

  • 我使用
  • np.allclose()

    检查了该算法与原始算法的正确性。

    
    

© www.soinside.com 2019 - 2024. All rights reserved.