我在尝试使用 Dask 实现共轭梯度算法(用于教学目的)时,我意识到性能比简单的 numpy 实现要差得多。 经过几次实验,我已经能够将问题简化为以下片段:
import numpy as np
import dask.array as da
from time import time
def test_operator(f, test_vector, library=np):
for n in (10, 20, 30):
v = test_vector()
start_time = time()
for i in range(n):
v = f(v)
k = library.linalg.norm(v)
try:
k = k.compute()
except AttributeError:
pass
print(k)
end_time = time()
print('Time for {} iterations: {}'.format(n, end_time - start_time))
print('NUMPY!')
test_operator(
lambda x: x + x,
lambda: np.random.rand(4_000, 4_000)
)
print('DASK!')
test_operator(
lambda x: x + x,
lambda: da.from_array(np.random.rand(4_000, 4_000), chunks=(2_000, 2_000)),
da
)
在代码中,我简单地将一个向量乘以 2(这就是 f 所做的)并打印它的范数。使用 dask 运行时,每次迭代都会减慢一点。如果我不计算
k
,v
的范数,这个问题就不会发生。
不幸的是,在我的例子中,
k
是我用来停止共轭梯度算法的残差范数。我怎样才能避免这个问题?为什么会这样?
谢谢!
问题出在Numpy和Dask之间的实现差异。在 Dask 实现中,循环的每次迭代都会触发工作人员之间的通信步骤。具体来说,循环的每次迭代都需要计算 Dask 数组的范数,不能并行计算,需要 worker 之间的通信。
相比之下,Numpy 实现可以在一次计算中计算数组的范数,而无需任何额外的通信成本。这是因为整个数组都存储在内存中,并且可以由单个进程访问。
为了解决这个问题,一个解决方案可能是使用Dask的持久化方法将中间结果保存在内存中,以便它们可以在后续迭代中重复使用,而不会产生额外的通信成本。 因此,代码可以修改如下:
“” def test_operator(f, test_vector, library=np): 对于 (10, 20, 30) 中的 n: v = test_vector()
start_time = time()
for i in range(n):
v = f(v)
if library == da:
v = v.persist() # menținerea rezultatului intermediar
k = library.linalg.norm(v)
try:
k = k.compute()
except AttributeError:
pass
print(k)
end_time = time()
print('Timp pentru {} iterații: {}'.format(n, end_time - start_time))
“”