如何在 Python 中优化和加速这个矩阵乘法

问题描述 投票:0回答:1

根据梯度方程,矩阵乘法由下式给出

同时需要

@
*
的地方。如果读者有兴趣,这里是代码:

# parameters
beta     =  0.98 
alpha    =  0.03
delta    =  0.1
T        =  1000
loop     =  1
dif      =  1
tol      =  1e-8

kss      =  ((1 / beta - (1 - delta)) / alpha)**(1 / (alpha - 1))
k        =  np.linspace(0.5 * kss, 1.8 * kss, T)

k_reshaped   =  k.reshape(-1, 1)
c            =  k_reshaped ** alpha + (1 - delta) * k_reshaped - k
c[c<0]       =  1e-11
c            =  np.log(c)
beta_square  =  beta**2

# multiplication
I   =  np.identity(T)
E   =  np.ones(T)[:,None]
Q2  =  I

while np.any(dif > tol) and loop < 200:
    J   =  beta * Q2
    B   =  inv(I - J)

    Q3  =  np.zeros([T,T])
    ini =  np.argmax(c + (B @ (J * c) @ E).flatten(),axis=1)
    Q3[np.arange(T),ini]  =  1


    gB  =  2 * B @ (J * c @ E) @ (beta * Q2 * c @ E + B @ (np.linalg.matrix_power(I - J, 2) * c @ E)).T / beta_square
    B   += 0.1 * gB

    dif  =  np.max(np.absolute(Q3 - Q2))
    kcQ  =  k[ini]

    Q2   =  Q3
    loop += 1

基本上是遵循随机梯度下降算法,矩阵

B
B = inv(I - J)
初始化,由
B += 0.1 * gB
演化,
J
Q2
变化,每次迭代需要确定
Q2
.然而
Q2
是一个稀疏矩阵,每列只有一个,其余为零,在代码中是这样的:

ini =  np.argmax(c + (B @ (J * c) @ E).flatten(),axis=1)
Q3[np.arange(T),ini]  =  1
...
Q2  =  Q3

代码目前演示了 1000 x 1000 的矩阵运算,是否可以对其进行优化并运行得更快?

python numpy matrix-multiplication
1个回答
2
投票

这里有一些改进:

  • beta * Q2
    被计算了两次,第二次可以用
    J
    代替。
  • J * c
    也可以多次计算,但可以一次完成。
    I - J
    也一样。
  • B @ (J * c) @ E
    B @ (J * c @ E)
    在数学上是等效的,但后者在您的情况下更快,也可以计算一次。
  • CPython(几乎)没有优化任何东西,Numpy 急切地执行操作,所以这样做
    0.1 * (2 * Matrix / beta_square)
    实际上计算一个新矩阵
    M2 = 2 * Matrix
    ,然后是一个新矩阵
    M3 = M2 / beta_square
    ,然后是另一个
    M4 = 0.1 * M4
    。创建许多像这样的临时矩阵是昂贵的,因为它是一个内存绑定操作,内存带宽在现代机器上非常有限(与计算能力相比),更不用说填充新的临时数组通常比已经填充的临时数组慢(因为虚拟内存,更具体地说是页面错误)。因此,执行
    (0.1 * 2 / beta_square) * Matrix
    更快(因为乘以浮点数比乘以大矩阵快得多)。
  • 使用 Numba 可以轻松加速一些基本操作,例如
    np.argmax(c + tmp3.flatten(), axis=1)
    np.max(np.absolute(Q3 - Q2))
    。事实上,大多数
  • 就地操作通常比异地操作更快(同样,因为昂贵的临时数组)。由于基本功能的
    out
    参数(例如
    np.multiply(A, B, out=C)
    ),您可以使用它们。话虽这么说,这里的好处很小,因为
    inv
    需要很长时间。
  • 假设在循环结束时不需要
    B
    ,您可以使用
    np.linalg.solve
    代替,如Homer512所述。对于大型矩阵(
    O(n**3)
    对比
    O(n**2)
    ),求解系统的速度明显更快,而且通常更准确。请参阅不要反转矩阵。例如,
    inv(I-J) @ b
    可以替换为
    solve(I-J, b)
    。使用
    solve
    的好处并不是那么大,尽管在您的特定用例中,因为稀疏
    I-J
    矩阵。
  • 如果真正使用
    B
    ,在循环的最后,那么这个就复杂一点。 Numba 可以帮助编写一个相对快速的矩阵求逆,专门针对您的用例中的稀疏矩阵(因为 Scipy 中的一个非常慢)。
  • np.linalg.matrix_power(tmp0, 2) * c
    也可以在 Numba 中针对稀疏矩阵进行优化。

这里是一个(几乎没有测试过的)使用 Numba 的实现:

@nb.njit('(float64[:,::1], float64[::1])', parallel=True)
def compute_ini(a, b):
    n, m = a.shape
    assert b.size == m and m > 0
    res = np.empty(n, np.int64)
    for i in nb.prange(n):
        max_val, max_pos = a[i, 0] + b[0], 0
        for j in range(1, m):
            val = a[i, j] + b[j]
            if val > max_val:
                max_val = val
                max_pos = j
        res[i] = max_pos
    return res

@nb.njit('(float64[:,::1], float64[:,::1])', parallel=True)
def max_abs_diff(a, b):
    return np.max(np.absolute(a - b))

# Utility function for invert_sparse_matrix
@nb.njit
def invert_sparse_matrix_subkernel(b, out, i1, n, eps):
    for i2 in range(n):
        if i2 != i1:
            scale = b[i2, i1]
            if abs(scale) >= eps:
                for j in range(n):
                    b[i2, j] -= scale * b[i1, j]
                    out[i2, j] -= scale * out[i1, j]

@nb.njit('(float64[:,::1], float64[:,::1])', parallel=True)
def invert_sparse_matrix(a, out):
    eps = 1e-14
    n, m = a.shape
    assert n == m and out.shape == (n,n)

    b = np.empty((n, n))

    for i in nb.prange(n):
        out[i, :i] = 0.0
        out[i, i] = 1.0
        out[i, i+1:] = 0.0
        b[i, :] = a[i, :]

    for i1 in range(n):
        scale = 1.0 / b[i1, i1]
        if abs(scale) < eps:
            b[i1, :].fill(0.0)
            out[i1, :].fill(0.0)
            invert_sparse_matrix_subkernel(b, out, i1, n, eps)
        elif abs(scale-1.0) < eps:
            invert_sparse_matrix_subkernel(b, out, i1, n, eps)
        else:
            b[i1, :] *= scale
            out[i1, :] *= scale
            invert_sparse_matrix_subkernel(b, out, i1, n, eps)

@nb.njit('(float64[:,::1], float64[:,::1], float64[:,::1])', parallel=True)
def sparse_square_premult(a, premult, out):
    eps = 1e-14
    n, m = a.shape
    assert n == m
    assert premult.shape == (n, n)
    assert out.shape == (n, n)

    for i in nb.prange(n):
        out[i, :] = 0.0
        for j in range(n):
            if abs(a[i, j]) >= eps:
                for k in range(n):
                    out[i, k] += a[i, j] * a[j, k]
        out[i, :] *= premult[i, :]

def compute_numba():
    # parameters
    beta     =  0.98 
    alpha    =  0.03
    delta    =  0.1
    T        =  1000
    loop     =  1
    dif      =  1
    tol      =  1e-8

    kss      =  ((1 / beta - (1 - delta)) / alpha)**(1 / (alpha - 1))
    k        =  np.linspace(0.5 * kss, 1.8 * kss, T)

    k_reshaped   =  k.reshape(-1, 1)
    c            =  k_reshaped ** alpha + (1 - delta) * k_reshaped - k
    c[c<0]       =  1e-11
    c            =  np.log(c)
    beta_square  =  beta**2

    # multiplication
    I   =  np.identity(T)
    E   =  np.ones(T)[:,None]
    Q2  =  I

    J = np.empty((T, T))
    tmp0 = np.empty((T, T))
    tmp1 = np.empty((T, T))
    tmp2 = np.empty((T, 1))
    tmp3 = np.empty((T, 1))
    tmp4 = np.empty((T, T))
    B = np.empty((T, T))

    while np.any(dif > tol) and loop < 200:
        np.multiply(beta, Q2, out=J)
        np.subtract(I, J, out=tmp0)
        invert_sparse_matrix(tmp0, B)

        Q3 = np.zeros((T,T))
        np.multiply(J, c, out=tmp1)
        np.matmul(tmp1, E, out=tmp2)
        np.matmul(B, tmp2, out=tmp3)
        ini = compute_ini(c, tmp3.flatten())
        Q3[np.arange(T), ini] = 1

        factor = 0.1 * 2 / beta_square
        sparse_square_premult(tmp0, c, tmp4)
        np.add(B, (factor * tmp3) @ (tmp2 + B @ (tmp4 @ E)).T, out=B)

        dif = max_abs_diff(Q3, Q2)
        kcQ = k[ini]

        Q2 = Q3
        loop += 1

compute_numba()

请注意,

invert_sparse_matrix
函数仍然需要很长时间(在我的机器上 >50%),尽管它比
inv
快大约 3 倍,和
solve
一样快。它是对 naive inversion algorithm 的改进,对非常稀疏的矩阵几乎没有优化。它当然可以进一步优化(例如使用平铺),但这肯定不是一件容易的事(特别是对于新手程序员)。

注意编译时间需要几秒钟。

总的来说,这个实现比我的 i5-9600KF 处理器(6 核)上的初始实现快 4~5 倍。

© www.soinside.com 2019 - 2024. All rights reserved.