Python多处理抛出Killed。9

问题描述 投票:3回答:1

我正在尝试使用 multiprocessing 加速一个函数,在这个函数中,我将2000个形状(76, 76)的数组平铺成3D数组,并应用一个缩放因子。

当瓷砖的数量少于200个时,它工作得很好,但我得到的是一个 Killed: 9 当它大于这个数字时,我需要能够处理1000个瓷砖。

这是一个简化版的代码。

from functools import partial
from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count
import numpy as np

def func_A(data, scale, N):
    """Tile the data N times and scale it"""
    arr = np.tile(data, (N, 1, 1))
    arr *= scale
    return arr

def func_B(N=4):
    """Create scaled arrays"""
    # Make data
    data = np.random.normal(size=(2000, 76, 76))

    # Make scales
    scales = np.arange(2000)

    # Multiprocess into tiled arrays
    pool = ThreadPool(cpu_count())
    func = partial(func_A, N=N)
    inpt = list(zip(data, scales))
    results = np.asarray(pool.starmap(func, inpt), dtype=np.float64)
    pool.close()
    pool.join()

    return results.swapaxes(0, 1)

所以,这对于 func_B(4) 而死 func_B(500).

我知道我用这么大的数组对Python的内存造成了负担,但有什么最好的方法来获得 func_B 与大型企业合作 N........最好快点?我使用的是 multiprocessing 错?我是否应该完全使用其他的东西,例如Dask、Numba、Cython等?

任何帮助将是非常感激的。谢谢!我正在尝试使用多进程来加速一个函数,其中有一个函数是Dask,Numba,Cython等。

python numpy multiprocessing dask numba
1个回答
2
投票

我不太确定您的计算目的是什么,但以下内容似乎可以在dask中完成这项工作。

import dask.array as da
import numpy as np

# Make data
data = da.random.normal(size=(2000, 76, 76), chunks=(2000, 76, 76))

# Make scales
scales = np.arange(2000)
N = 500
out = da.repeat(data, N, axis=0).reshape((N, 2000, 76, 76)) * scales.reshape((1, 2000, 1, 1))
out = out.sum(axis=0).compute()

保持工作内存<~5GB,并使用你的大部分核心。


2
投票

我认为覆盖内存问题的最直观的解决方案是使用float16数组。我尝试用更简单的方式重写所有的过程(func_C)

### your method ###

def func_A(data, scale, N):
    """Tile the data N times and scale it"""
    arr = np.tile(data, (N, 1, 1))
    arr *= scale
    return arr

def func_B(N=4):
    """Create scaled arrays"""
    # Make data
    data = np.random.normal(size=(2000, 76, 76)).astype(np.float16) ###### set float16

    # Make scales
    scales = np.arange(2000).astype(np.float16) ###### set float16

    # Multiprocess into tiled arrays
    pool = ThreadPool(cpu_count())
    func = partial(func_A, N=N)
    inpt = list(zip(data, scales))
    results = np.asarray(pool.starmap(func, inpt), dtype=np.float16) ###### set float16
    pool.close()
    pool.join()

    return results.swapaxes(0, 1)

### alternative method ###

def func_C(N=4):

    scales = np.arange(2000).astype(np.float16)
    data = np.random.normal(size=(2000, 76, 76)).astype(np.float16)
    results = np.stack(N*[data*scales[:,None,None]])

    return results

检查结果

np.random.seed(33)
a = func_B(10)
np.random.seed(33)
b = func_C(10)
(a == b).all() # ===> TRUE

检查业绩

enter image description here


1
投票

所以,这是我在经历了艰苦的任务之后的观察。

  • 你最终要得到的输出数组是4维的,形状是: (2000, 2000, 76, 76) 并且是由 float64 类型值。粗略计算一下,这个数组的大小是。2000*2000*76*76*8字节=约170GB。........所以你绝对不可能一下子把所有的东西都记在心里。
  • 记忆的用法 multiprocessing 是复杂的(对于一个没有严格研究多处理的人来说,一直都是这样),而且它产生的计算时间也不是很好。例如,在 谷歌合作,(特斯拉T4 GPU后端,12GB内存)。N = 50 需要约4.5秒(最少)的时间来运行。一个更好的模块实现可能是可能的,但我不是那个人。

我的行动方案。

为了解决第二个问题,我使用 cupy,这应该是一个 替换式 对于 numpy 在Python中。所谓 drop-in 替换,指的是您可以替换掉 numpycupy 在你的代码中随处可见(也有例外--与此问题无关)。cupy 但在Nvidia GPU上使用了CUDA,因此你需要在进行 cupy 安装。(检查这个指南。)或者,如果可能的话,你可能会更喜欢在线计算资源,就像我用的 谷歌合作. 我还把工作分成几部分。我使用了一个函数 fnh(a, scale, N) 来计算任意的按比例排列的数组。N. 我将预定的输出数组切成多个部分,然后反复运行 fnh(...) 在这些切片上。为了更好的优化,可以对切片进行调整,但我只是用了一些基于粗略猜测的东西。

下面是代码。

import cupy as cp


def fnh(a, scale, N):
    arr = cp.einsum('i,ijk->ijk', scale, a)
    result = cp.tile(arr, (N, 1, 1, 1))

    del arr
    return result


def slicer(arr, scales, N = 400):
    mempool = cp.get_default_memory_pool()
    pinned_mempool = cp.get_default_pinned_memory_pool()
    # result = np.empty((N, 2000, 76, 76))    # to large to be allocated

    section = 500                             # Choices subject
    parts = 80                                # to optimization
    step = N // parts

    for i in range(parts):                    # Slice N into equal parts
        begin = i*step
        end = begin + step

        stacked = cp.empty((step, 2000, 76, 76))

        for j in range(2000 // section):      # Section the 2000 arrays into equal parts
            begin = j*section
            end = begin + section

            s = scales[begin:end]
            a = arr[begin:end]
            res = fnh(a, s, step)
            stacked[:, begin:end] = res       # Accumulate values

            del a, res

        # result[begin:end] = stacked         # This is where we were supposed to 
                                              # accumulate values in result
        del stacked
        mempool.free_all_blocks()
        pinned_mempool.free_all_blocks()

首先,我用了 cupy.einsum 来计算缩放比例 矢量 上的数组。

其次,我尽可能地删除数组以回收空间。具体来说,必须重新分配空间,由 cupy 在GPU内存池中使用 mempool.free_all_blocks()pinned_mempool.free_all_blocks(),以恢复可用的GPU内存。了解更多 此处. 然而,由于 cupy 缓存所分配的内存,以有限的方式使用这种缓存可能会对加速有所帮助。(这是一种预感,我不是特别了解。)所以我将同样的内存用于分割瓷砖,并在完成一个N-slice后将其清除。

第三,凡 # result[begin:end] = stacked 是,你应该 卸载 数组;因为你无法承受整个数组在内存中的费用,如前所述。在你认为合适的地方和方式将其卸载到某个bin中。你的 应用可能是避免内存问题的好办法。

第四,这段代码是不完整的。这是因为形成的数组需要适当的处理,如前所述。但它做的是主要的重头戏。

最后,要对这段代码进行计时,使用 timeit,在 谷歌合作: 供比较: N = 50 需要约50毫秒(最少),并且 N = 2000 需要约7.4秒(最少)来运行。

更新:改用 parts = 40section = 250 使得最短的时间降到了~6.1秒。

好吧,我相信会有更好的方法来写这段代码,我很期待!

© www.soinside.com 2019 - 2024. All rights reserved.