使用 Scipy 稀疏数组提高计算速度

问题描述 投票:0回答:1

我正在使用 scipy 稀疏矩阵来计算估计中使用的选择概率。下面是一个例子。怎么可能更快?

变量和参数:

import itertools
import numpy as np
import scipy as sp
import timeit

T = 70000
N = 200
p = 212

X = sp.sparse.random(T, (N)*p, density=0.018, format='csr')
B = np.random.rand(p,)
I = sp.sparse.eye(N,format="csr")

计算选择概率的函数:

def csr_example(X,B,I,tr):
    tem0 = sp.sparse.kron(I, B, format = "csr").transpose()
    exp_xb = X@tem0
    exp_xb.data = np.exp( exp_xb.data, out=exp_xb.data)
    exp_xb0 = sp.sparse.csr_matrix.sum(exp_xb,axis=1)
    exp_xb0 = np.reshape(exp_xb0, (tr, 1))
    pro = exp_xb/exp_xb0
    return pro

执行时间:

print(timeit.timeit(lambda: csr_example(X,B,I,T), setup="pass",number=10))

约15秒

numpy scipy sparse-matrix
1个回答
0
投票

是的,您可以尝试一些想法来加快速度。

我通过在线路分析器下运行你的程序开始分析你的程序。

Timer unit: 1e-09 s

Total time: 0.688771 s
File: /tmp/ipykernel_4391/4040554224.py
Function: csr_example at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
     1                                           def csr_example(X,B,I,tr):
     2         1    2630726.0    3e+06      0.4      tem0 = sp.sparse.kron(I, B, format = "csr").transpose()
     3         1  574464285.0    6e+08     83.4      exp_xb = X@tem0
     4         1   38584926.0    4e+07      5.6      exp_xb.data = np.exp( exp_xb.data, out=exp_xb.data)
     5         1    3826251.0    4e+06      0.6      exp_xb0 = sp.sparse.csr_matrix.sum(exp_xb,axis=1)
     6         1      13541.0  13541.0      0.0      exp_xb0 = np.reshape(exp_xb0, (tr, 1))
     7         1   69250761.0    7e+07     10.1      pro = exp_xb/exp_xb0
     8         1        287.0    287.0      0.0      return pro

这表明程序 83% 的时间花费在 X 和 tem0 之间的矩阵乘法上。一般来说,SciPy 使用最快的已知算法在任意矩阵之间进行矩阵乘法。除非您可以利用输入矩阵的某些属性,否则您无法走得更快。

有用的是,该乘法的右侧是单位矩阵和 B(一个一维数组)的转置 Kroneker 乘积。

如果 B 设置为

[X, Y, Z]
,这个克罗内克产品将如下所示:

X 0 0
Y 0 0
Z 0 0
0 X 0
0 Y 0
0 Z 0
0 0 X
0 0 Y
0 0 Z

这有一些不错的特性:

  • 在 Kroneker 产品完成之前,B 相当小。如果数据很小,就可以节省内存访问。
  • X 的每个元素最多与矩阵乘积的单个元素相关。具体来说,如果 X 索引为
    row, col
    ,则该元素将
    B[col % p] * X[row, col]
    添加到
    row, col // p
    处的元素。

考虑到这一点,我编写了一个矩阵乘法函数,它计算 X @ (I ⊗ B) 而无需显式计算 (I ⊗ B)。

import numpy as np
import scipy as sp
import timeit
import numba as nb


def csr_example(X,B,I,tr):
    tem0 = sp.sparse.kron(I, B, format = "csr").transpose()
    exp_xb = X@tem0
    exp_xb.data = np.exp( exp_xb.data, out=exp_xb.data)
    exp_xb0 = sp.sparse.csr_matrix.sum(exp_xb,axis=1)
    exp_xb0 = np.reshape(exp_xb0, (tr, 1))
    pro = exp_xb/exp_xb0
    return pro

    
def naive_mm(X, B, N):
    tem0 = sp.sparse.kron(I, B, format = "csr").transpose()
    exp_xb = X @ tem0
    return exp_xb


@nb.njit()
def numba_mm_inner(Xindptr, Xindices, Xdata, B, N, out):
    p = B.shape[0]
    # Loop over rows
    for i in range(len(Xindptr) - 1):
        # Get all column numbers and data for this row
        ind_begin = Xindptr[i]
        ind_end = Xindptr[i + 1]
        cols = Xindices[ind_begin:ind_end]
        data = Xdata[ind_begin:ind_end]
        # Where is the beginning of B located with respect to
        # the beginning of X?
        col_offset = 0
        # The column of the output array to write to
        out_col = 0
        # The running total for this element
        sum_ = 0
        for j in range(len(cols)):
            in_col = cols[j]
            while not in_col < col_offset + p:
                # None of the remaining X values wil be relevant
                # to this column
                out[i, out_col] = sum_
                sum_ = 0
                col_offset += p
                out_col += 1
            
            assert in_col >= col_offset
            sum_ += data[j] * (B[in_col - col_offset])
        # Write any remaining sum
        out[i, out_col] = sum_

    
def numba_mm(X, B, N):
    X = X.tocsr()
    out = np.zeros((X.shape[0], N))
    assert X.shape[0] == out.shape[0]
    assert X.shape[1] == N * B.shape[0]
    numba_mm_inner(X.indptr, X.indices, X.data, B, N, out)
    return out


def csr_opt(X,B,N,tr):
    exp_xb = numba_mm(X, B, N)
    np.exp(exp_xb, out=exp_xb, where=exp_xb != 0)
    exp_xb0 = np.sum(exp_xb,axis=1)
    exp_xb0 = np.reshape(exp_xb0, (tr, 1))
    pro = exp_xb/exp_xb0
    return pro


def main():
    T = 30000
    N = 200
    p = 212

    X = sp.sparse.random(T, (N)*p, density=0.018, format='csr')
    B = np.random.rand(p,)  
    I = sp.sparse.eye(N,format="csr")

    old = csr_example(X,B,I,T).toarray()
    new = csr_opt(X,B,N,T)
    print("Methods equal:", np.allclose(old, new))
    time_old = timeit.timeit(lambda: csr_example(X,B,I,T),number=20)
    time_new = timeit.timeit(lambda: csr_opt(X,B,N,T),number=20)
    print(f"old {time_old:.4f}")
    print(f"new {time_new:.4f}")
    print(f"ratio {time_old / time_new:.4f}")


main()

备注:

  • naive_mm
    numba_mm
    是矩阵乘法的两种实现:都计算 X @ (I ⊗ B),并且旨在获得相同的结果。
    naive_mm
    未被实际代码使用 - 它只是用于比较。
  • numba_mm
    输出密集数组作为输出。我注意到这一步的输出只有 2% 的稀疏度,所以稀疏矩阵在这之后并不能真正帮助你。
© www.soinside.com 2019 - 2024. All rights reserved.