同时需要
@
和*
的地方。如果读者有兴趣,这里是代码:
# parameters
beta = 0.98
alpha = 0.03
delta = 0.1
T = 1000
loop = 1
dif = 1
tol = 1e-8
kss = ((1 / beta - (1 - delta)) / alpha)**(1 / (alpha - 1))
k = np.linspace(0.5 * kss, 1.8 * kss, T)
k_reshaped = k.reshape(-1, 1)
c = k_reshaped ** alpha + (1 - delta) * k_reshaped - k
c[c<0] = 1e-11
c = np.log(c)
beta_square = beta**2
# multiplication
I = np.identity(T)
E = np.ones(T)[:,None]
Q2 = I
while np.any(dif > tol) and loop < 200:
J = beta * Q2
B = inv(I - J)
Q3 = np.zeros([T,T])
ini = np.argmax(c + (B @ (J * c) @ E).flatten(),axis=1)
Q3[np.arange(T),ini] = 1
gB = 2 * B @ (J * c @ E) @ (beta * Q2 * c @ E + B @ (np.linalg.matrix_power(I - J, 2) * c @ E)).T / beta_square
B += 0.1 * gB
dif = np.max(np.absolute(Q3 - Q2))
kcQ = k[ini]
Q2 = Q3
loop += 1
基本上是遵循随机梯度下降算法,矩阵
B
由B = inv(I - J)
初始化,由B += 0.1 * gB
演化,J
随Q2
变化,每次迭代需要确定Q2
.然而Q2
是一个稀疏矩阵,每列只有一个,其余为零,在代码中是这样的:
ini = np.argmax(c + (B @ (J * c) @ E).flatten(),axis=1)
Q3[np.arange(T),ini] = 1
...
Q2 = Q3
代码目前演示了 1000 x 1000 的矩阵运算,是否可以对其进行优化并运行得更快?
这里有一些改进:
beta * Q2
被计算了两次,第二次可以用J
代替。J * c
也可以多次计算,但可以一次完成。 I - J
也一样。B @ (J * c) @ E
和 B @ (J * c @ E)
在数学上是等效的,但后者在您的情况下更快,也可以计算一次。0.1 * (2 * Matrix / beta_square)
实际上计算一个新矩阵 M2 = 2 * Matrix
,然后是一个新矩阵 M3 = M2 / beta_square
,然后是另一个 M4 = 0.1 * M4
。创建许多像这样的临时矩阵是昂贵的,因为它是一个内存绑定操作,内存带宽在现代机器上非常有限(与计算能力相比),更不用说填充新的临时数组通常比已经填充的临时数组慢(因为虚拟内存,更具体地说是页面错误)。因此,执行 (0.1 * 2 / beta_square) * Matrix
更快(因为乘以浮点数比乘以大矩阵快得多)。np.argmax(c + tmp3.flatten(), axis=1)
或 np.max(np.absolute(Q3 - Q2))
。事实上,大多数out
参数(例如np.multiply(A, B, out=C)
),您可以使用它们。话虽这么说,这里的好处很小,因为inv
需要很长时间。B
,您可以使用np.linalg.solve
代替,如Homer512所述。对于大型矩阵(O(n**3)
对比 O(n**2)
),求解系统的速度明显更快,而且通常更准确。请参阅不要反转矩阵。例如,inv(I-J) @ b
可以替换为 solve(I-J, b)
。使用 solve
的好处并不是那么大,尽管在您的特定用例中,因为稀疏 I-J
矩阵。B
,在循环的最后,那么这个就复杂一点。 Numba 可以帮助编写一个相对快速的矩阵求逆,专门针对您的用例中的稀疏矩阵(因为 Scipy 中的一个非常慢)。np.linalg.matrix_power(tmp0, 2) * c
也可以在 Numba 中针对稀疏矩阵进行优化。这里是一个(几乎没有测试过的)使用 Numba 的实现:
@nb.njit('(float64[:,::1], float64[::1])', parallel=True)
def compute_ini(a, b):
n, m = a.shape
assert b.size == m and m > 0
res = np.empty(n, np.int64)
for i in nb.prange(n):
max_val, max_pos = a[i, 0] + b[0], 0
for j in range(1, m):
val = a[i, j] + b[j]
if val > max_val:
max_val = val
max_pos = j
res[i] = max_pos
return res
@nb.njit('(float64[:,::1], float64[:,::1])', parallel=True)
def max_abs_diff(a, b):
return np.max(np.absolute(a - b))
# Utility function for invert_sparse_matrix
@nb.njit
def invert_sparse_matrix_subkernel(b, out, i1, n, eps):
for i2 in range(n):
if i2 != i1:
scale = b[i2, i1]
if abs(scale) >= eps:
for j in range(n):
b[i2, j] -= scale * b[i1, j]
out[i2, j] -= scale * out[i1, j]
@nb.njit('(float64[:,::1], float64[:,::1])', parallel=True)
def invert_sparse_matrix(a, out):
eps = 1e-14
n, m = a.shape
assert n == m and out.shape == (n,n)
b = np.empty((n, n))
for i in nb.prange(n):
out[i, :i] = 0.0
out[i, i] = 1.0
out[i, i+1:] = 0.0
b[i, :] = a[i, :]
for i1 in range(n):
scale = 1.0 / b[i1, i1]
if abs(scale) < eps:
b[i1, :].fill(0.0)
out[i1, :].fill(0.0)
invert_sparse_matrix_subkernel(b, out, i1, n, eps)
elif abs(scale-1.0) < eps:
invert_sparse_matrix_subkernel(b, out, i1, n, eps)
else:
b[i1, :] *= scale
out[i1, :] *= scale
invert_sparse_matrix_subkernel(b, out, i1, n, eps)
@nb.njit('(float64[:,::1], float64[:,::1], float64[:,::1])', parallel=True)
def sparse_square_premult(a, premult, out):
eps = 1e-14
n, m = a.shape
assert n == m
assert premult.shape == (n, n)
assert out.shape == (n, n)
for i in nb.prange(n):
out[i, :] = 0.0
for j in range(n):
if abs(a[i, j]) >= eps:
for k in range(n):
out[i, k] += a[i, j] * a[j, k]
out[i, :] *= premult[i, :]
def compute_numba():
# parameters
beta = 0.98
alpha = 0.03
delta = 0.1
T = 1000
loop = 1
dif = 1
tol = 1e-8
kss = ((1 / beta - (1 - delta)) / alpha)**(1 / (alpha - 1))
k = np.linspace(0.5 * kss, 1.8 * kss, T)
k_reshaped = k.reshape(-1, 1)
c = k_reshaped ** alpha + (1 - delta) * k_reshaped - k
c[c<0] = 1e-11
c = np.log(c)
beta_square = beta**2
# multiplication
I = np.identity(T)
E = np.ones(T)[:,None]
Q2 = I
J = np.empty((T, T))
tmp0 = np.empty((T, T))
tmp1 = np.empty((T, T))
tmp2 = np.empty((T, 1))
tmp3 = np.empty((T, 1))
tmp4 = np.empty((T, T))
B = np.empty((T, T))
while np.any(dif > tol) and loop < 200:
np.multiply(beta, Q2, out=J)
np.subtract(I, J, out=tmp0)
invert_sparse_matrix(tmp0, B)
Q3 = np.zeros((T,T))
np.multiply(J, c, out=tmp1)
np.matmul(tmp1, E, out=tmp2)
np.matmul(B, tmp2, out=tmp3)
ini = compute_ini(c, tmp3.flatten())
Q3[np.arange(T), ini] = 1
factor = 0.1 * 2 / beta_square
sparse_square_premult(tmp0, c, tmp4)
np.add(B, (factor * tmp3) @ (tmp2 + B @ (tmp4 @ E)).T, out=B)
dif = max_abs_diff(Q3, Q2)
kcQ = k[ini]
Q2 = Q3
loop += 1
compute_numba()
请注意,
invert_sparse_matrix
函数仍然需要很长时间(在我的机器上 >50%),尽管它比 inv
快大约 3 倍,和 solve
一样快。它是对 naive inversion algorithm 的改进,对非常稀疏的矩阵几乎没有优化。它当然可以进一步优化(例如使用平铺),但这肯定不是一件容易的事(特别是对于新手程序员)。
注意编译时间需要几秒钟。
总的来说,这个实现比我的 i5-9600KF 处理器(6 核)上的初始实现快 4~5 倍。