我正在尝试使用 multiprocessing
加速一个函数,在这个函数中,我将2000个形状(76, 76)的数组平铺成3D数组,并应用一个缩放因子。
当瓷砖的数量少于200个时,它工作得很好,但我得到的是一个 Killed: 9
当它大于这个数字时,我需要能够处理1000个瓷砖。
这是一个简化版的代码。
from functools import partial
from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count
import numpy as np
def func_A(data, scale, N):
"""Tile the data N times and scale it"""
arr = np.tile(data, (N, 1, 1))
arr *= scale
return arr
def func_B(N=4):
"""Create scaled arrays"""
# Make data
data = np.random.normal(size=(2000, 76, 76))
# Make scales
scales = np.arange(2000)
# Multiprocess into tiled arrays
pool = ThreadPool(cpu_count())
func = partial(func_A, N=N)
inpt = list(zip(data, scales))
results = np.asarray(pool.starmap(func, inpt), dtype=np.float64)
pool.close()
pool.join()
return results.swapaxes(0, 1)
所以,这对于 func_B(4)
而死 func_B(500)
.
我知道我用这么大的数组对Python的内存造成了负担,但有什么最好的方法来获得 func_B
与大型企业合作 N
........最好快点?我使用的是 multiprocessing
错?我是否应该完全使用其他的东西,例如Dask、Numba、Cython等?
任何帮助将是非常感激的。谢谢!我正在尝试使用多进程来加速一个函数,其中有一个函数是Dask,Numba,Cython等。
我不太确定您的计算目的是什么,但以下内容似乎可以在dask中完成这项工作。
import dask.array as da
import numpy as np
# Make data
data = da.random.normal(size=(2000, 76, 76), chunks=(2000, 76, 76))
# Make scales
scales = np.arange(2000)
N = 500
out = da.repeat(data, N, axis=0).reshape((N, 2000, 76, 76)) * scales.reshape((1, 2000, 1, 1))
out = out.sum(axis=0).compute()
保持工作内存<~5GB,并使用你的大部分核心。
我认为覆盖内存问题的最直观的解决方案是使用float16数组。我尝试用更简单的方式重写所有的过程(func_C)
### your method ###
def func_A(data, scale, N):
"""Tile the data N times and scale it"""
arr = np.tile(data, (N, 1, 1))
arr *= scale
return arr
def func_B(N=4):
"""Create scaled arrays"""
# Make data
data = np.random.normal(size=(2000, 76, 76)).astype(np.float16) ###### set float16
# Make scales
scales = np.arange(2000).astype(np.float16) ###### set float16
# Multiprocess into tiled arrays
pool = ThreadPool(cpu_count())
func = partial(func_A, N=N)
inpt = list(zip(data, scales))
results = np.asarray(pool.starmap(func, inpt), dtype=np.float16) ###### set float16
pool.close()
pool.join()
return results.swapaxes(0, 1)
### alternative method ###
def func_C(N=4):
scales = np.arange(2000).astype(np.float16)
data = np.random.normal(size=(2000, 76, 76)).astype(np.float16)
results = np.stack(N*[data*scales[:,None,None]])
return results
检查结果
np.random.seed(33)
a = func_B(10)
np.random.seed(33)
b = func_C(10)
(a == b).all() # ===> TRUE
检查业绩
所以,这是我在经历了艰苦的任务之后的观察。
(2000, 2000, 76, 76)
并且是由 float64
类型值。粗略计算一下,这个数组的大小是。2000*2000*76*76*8字节=约170GB。........所以你绝对不可能一下子把所有的东西都记在心里。multiprocessing
是复杂的(对于一个没有严格研究多处理的人来说,一直都是这样),而且它产生的计算时间也不是很好。例如,在 谷歌合作,(特斯拉T4 GPU后端,12GB内存)。N = 50
需要约4.5秒(最少)的时间来运行。一个更好的模块实现可能是可能的,但我不是那个人。我的行动方案。
为了解决第二个问题,我使用 cupy
,这应该是一个 替换式 对于 numpy
在Python中。所谓 drop-in 替换,指的是您可以替换掉 numpy
与 cupy
在你的代码中随处可见(也有例外--与此问题无关)。cupy
但在Nvidia GPU上使用了CUDA,因此你需要在进行 cupy
安装。(检查这个指南。)或者,如果可能的话,你可能会更喜欢在线计算资源,就像我用的 谷歌合作. 我还把工作分成几部分。我使用了一个函数 fnh(a, scale, N)
来计算任意的按比例排列的数组。N
. 我将预定的输出数组切成多个部分,然后反复运行 fnh(...)
在这些切片上。为了更好的优化,可以对切片进行调整,但我只是用了一些基于粗略猜测的东西。
下面是代码。
import cupy as cp
def fnh(a, scale, N):
arr = cp.einsum('i,ijk->ijk', scale, a)
result = cp.tile(arr, (N, 1, 1, 1))
del arr
return result
def slicer(arr, scales, N = 400):
mempool = cp.get_default_memory_pool()
pinned_mempool = cp.get_default_pinned_memory_pool()
# result = np.empty((N, 2000, 76, 76)) # to large to be allocated
section = 500 # Choices subject
parts = 80 # to optimization
step = N // parts
for i in range(parts): # Slice N into equal parts
begin = i*step
end = begin + step
stacked = cp.empty((step, 2000, 76, 76))
for j in range(2000 // section): # Section the 2000 arrays into equal parts
begin = j*section
end = begin + section
s = scales[begin:end]
a = arr[begin:end]
res = fnh(a, s, step)
stacked[:, begin:end] = res # Accumulate values
del a, res
# result[begin:end] = stacked # This is where we were supposed to
# accumulate values in result
del stacked
mempool.free_all_blocks()
pinned_mempool.free_all_blocks()
首先,我用了 cupy.einsum
来计算缩放比例 矢量 上的数组。
其次,我尽可能地删除数组以回收空间。具体来说,必须重新分配空间,由 cupy
在GPU内存池中使用 mempool.free_all_blocks()
和 pinned_mempool.free_all_blocks()
,以恢复可用的GPU内存。了解更多 此处. 然而,由于 cupy
缓存所分配的内存,以有限的方式使用这种缓存可能会对加速有所帮助。(这是一种预感,我不是特别了解。)所以我将同样的内存用于分割瓷砖,并在完成一个N-slice后将其清除。
第三,凡 # result[begin:end] = stacked
是,你应该 卸载 数组;因为你无法承受整个数组在内存中的费用,如前所述。在你认为合适的地方和方式将其卸载到某个bin中。你的 应用可能是避免内存问题的好办法。
第四,这段代码是不完整的。这是因为形成的数组需要适当的处理,如前所述。但它做的是主要的重头戏。
最后,要对这段代码进行计时,使用 timeit
,在 谷歌合作: 供比较: N = 50
需要约50毫秒(最少),并且 N = 2000
需要约7.4秒(最少)来运行。
更新:改用 parts = 40
和 section = 250
使得最短的时间降到了~6.1秒。
好吧,我相信会有更好的方法来写这段代码,我很期待!