我正在使用 scipy 稀疏矩阵来计算估计中使用的选择概率。下面是一个例子。怎么可能更快?
变量和参数:
import itertools
import numpy as np
import scipy as sp
import timeit
T = 70000
N = 200
p = 212
X = sp.sparse.random(T, (N)*p, density=0.018, format='csr')
B = np.random.rand(p,)
I = sp.sparse.eye(N,format="csr")
计算选择概率的函数:
def csr_example(X,B,I,tr):
tem0 = sp.sparse.kron(I, B, format = "csr").transpose()
exp_xb = X@tem0
exp_xb.data = np.exp( exp_xb.data, out=exp_xb.data)
exp_xb0 = sp.sparse.csr_matrix.sum(exp_xb,axis=1)
exp_xb0 = np.reshape(exp_xb0, (tr, 1))
pro = exp_xb/exp_xb0
return pro
执行时间:
print(timeit.timeit(lambda: csr_example(X,B,I,T), setup="pass",number=10))
约15秒
是的,您可以尝试一些想法来加快速度。
我通过在线路分析器下运行你的程序开始分析你的程序。
Timer unit: 1e-09 s
Total time: 0.688771 s
File: /tmp/ipykernel_4391/4040554224.py
Function: csr_example at line 1
Line # Hits Time Per Hit % Time Line Contents
==============================================================
1 def csr_example(X,B,I,tr):
2 1 2630726.0 3e+06 0.4 tem0 = sp.sparse.kron(I, B, format = "csr").transpose()
3 1 574464285.0 6e+08 83.4 exp_xb = X@tem0
4 1 38584926.0 4e+07 5.6 exp_xb.data = np.exp( exp_xb.data, out=exp_xb.data)
5 1 3826251.0 4e+06 0.6 exp_xb0 = sp.sparse.csr_matrix.sum(exp_xb,axis=1)
6 1 13541.0 13541.0 0.0 exp_xb0 = np.reshape(exp_xb0, (tr, 1))
7 1 69250761.0 7e+07 10.1 pro = exp_xb/exp_xb0
8 1 287.0 287.0 0.0 return pro
这表明程序 83% 的时间花费在 X 和 tem0 之间的矩阵乘法上。一般来说,SciPy 使用最快的已知算法在任意矩阵之间进行矩阵乘法。除非您可以利用输入矩阵的某些属性,否则您无法走得更快。
有用的是,该乘法的右侧是单位矩阵和 B(一个一维数组)的转置 Kroneker 乘积。
如果 B 设置为
[X, Y, Z]
,这个克罗内克产品将如下所示:
X 0 0
Y 0 0
Z 0 0
0 X 0
0 Y 0
0 Z 0
0 0 X
0 0 Y
0 0 Z
这有一些不错的特性:
row, col
,则该元素将 B[col % p] * X[row, col]
添加到 row, col // p
处的元素。考虑到这一点,我编写了一个矩阵乘法函数,它计算 X @ (I ⊗ B) 而无需显式计算 (I ⊗ B)。
import numpy as np
import scipy as sp
import timeit
import numba as nb
def csr_example(X,B,I,tr):
tem0 = sp.sparse.kron(I, B, format = "csr").transpose()
exp_xb = X@tem0
exp_xb.data = np.exp( exp_xb.data, out=exp_xb.data)
exp_xb0 = sp.sparse.csr_matrix.sum(exp_xb,axis=1)
exp_xb0 = np.reshape(exp_xb0, (tr, 1))
pro = exp_xb/exp_xb0
return pro
def naive_mm(X, B, N):
tem0 = sp.sparse.kron(I, B, format = "csr").transpose()
exp_xb = X @ tem0
return exp_xb
@nb.njit()
def numba_mm_inner(Xindptr, Xindices, Xdata, B, N, out):
p = B.shape[0]
# Loop over rows
for i in range(len(Xindptr) - 1):
# Get all column numbers and data for this row
ind_begin = Xindptr[i]
ind_end = Xindptr[i + 1]
cols = Xindices[ind_begin:ind_end]
data = Xdata[ind_begin:ind_end]
# Where is the beginning of B located with respect to
# the beginning of X?
col_offset = 0
# The column of the output array to write to
out_col = 0
# The running total for this element
sum_ = 0
for j in range(len(cols)):
in_col = cols[j]
while not in_col < col_offset + p:
# None of the remaining X values wil be relevant
# to this column
out[i, out_col] = sum_
sum_ = 0
col_offset += p
out_col += 1
assert in_col >= col_offset
sum_ += data[j] * (B[in_col - col_offset])
# Write any remaining sum
out[i, out_col] = sum_
def numba_mm(X, B, N):
X = X.tocsr()
out = np.zeros((X.shape[0], N))
assert X.shape[0] == out.shape[0]
assert X.shape[1] == N * B.shape[0]
numba_mm_inner(X.indptr, X.indices, X.data, B, N, out)
return out
def csr_opt(X,B,N,tr):
exp_xb = numba_mm(X, B, N)
np.exp(exp_xb, out=exp_xb, where=exp_xb != 0)
exp_xb0 = np.sum(exp_xb,axis=1)
exp_xb0 = np.reshape(exp_xb0, (tr, 1))
pro = exp_xb/exp_xb0
return pro
def main():
T = 30000
N = 200
p = 212
X = sp.sparse.random(T, (N)*p, density=0.018, format='csr')
B = np.random.rand(p,)
I = sp.sparse.eye(N,format="csr")
old = csr_example(X,B,I,T).toarray()
new = csr_opt(X,B,N,T)
print("Methods equal:", np.allclose(old, new))
time_old = timeit.timeit(lambda: csr_example(X,B,I,T),number=20)
time_new = timeit.timeit(lambda: csr_opt(X,B,N,T),number=20)
print(f"old {time_old:.4f}")
print(f"new {time_new:.4f}")
print(f"ratio {time_old / time_new:.4f}")
main()
备注:
naive_mm
和 numba_mm
是矩阵乘法的两种实现:都计算 X @ (I ⊗ B),并且旨在获得相同的结果。 naive_mm
未被实际代码使用 - 它只是用于比较。numba_mm
输出密集数组作为输出。我注意到这一步的输出只有 2% 的稀疏度,所以稀疏矩阵在这之后并不能真正帮助你。