[在Windows中运行python多重处理时的高内存使用率

问题描述 投票:1回答:1

下面的代码是一个人为的示例,它模拟了我遇到的一个实际问题,该问题使用多重处理来加快代码的速度。代码在Windows 10 64-bit OSpython 3.7.5ipython 7.9.0]上运行


转换函数(这些函数将用于转换main()中的数组)

from itertools import product
from functools import partial

from numba import njit, prange
import multiprocessing as mp
import numpy as np

@njit(parallel= True)
def transform_array_c(data, n):

    ar_len= len(data)

    sec_max1= np.empty(ar_len, dtype = data.dtype)
    sec_max2= np.empty(ar_len, dtype = data.dtype)

    for i in prange(n-1):
        sec_max1[i]= np.nan

    for sec in prange(ar_len//n):
        s2_max= data[n*sec+ n-1]
        s1_max= data[n*sec+ n]

        for i in range(n-1,-1,-1):
            if data[n*sec+i] > s2_max:
                s2_max= data[n*sec+i]
            sec_max2[n*sec+i]= s2_max

        sec_max1[n*sec+ n-1]= sec_max2[n*sec]

        for i in range(n-1):
            if n*sec+n+i < ar_len:
                if data[n*sec+n+i] > s1_max:
                    s1_max= data[n*sec+n+i]
                sec_max1[n*sec+n+i]= max(s1_max, sec_max2[n*sec+i+1])

            else:
                break

    return sec_max1  

@njit(error_model= 'numpy', cache= True)
def rt_mean_sq_dev(array1, array2, n):
    msd_temp = np.empty(array1.shape[0])

    K = array2[n-1]

    rs_x= array1[0] - K
    rs_xsq = rs_x *rs_x

    msd_temp[0] = np.nan

    for i in range(1,n):
        rs_x += array1[i] - K
        rs_xsq += np.square(array1[i] - K)
        msd_temp[i] = np.nan

    y_i = array2[n-1] - K
    msd_temp[n-1] = np.sqrt(max(y_i*y_i + (rs_xsq - 2*y_i*rs_x)/n, 0))

    for i in range(n, array1.shape[0]):
        rs_x = array1[i] - array1[i-n]+ rs_x
        rs_xsq = np.square(array1[i] - K) - np.square(array1[i-n] - K) + rs_xsq
        y_i = array2[i] - K

        msd_temp[i] = np.sqrt(max(y_i*y_i + (rs_xsq - 2*y_i*rs_x)/n, 0))

    return msd_temp 

@njit(cache= True)
def transform_array_a(data, n):
    result = np.empty(data.shape[0], dtype= data.dtype)
    alpharev = 1. - 2 / (n + 1)
    alpharev_exp = alpharev

    e = data[0]
    w = 1.

    if n == 2: result[0] = e
    else:result[0] = np.nan

    for i in range(1, data.shape[0]):
        w += alpharev_exp
        e = e*alpharev + data[i]

        if i > n -3:result[i] = e / w
        else:result[i] = np.nan

        if alpharev_exp > 3e-307:alpharev_exp*= alpharev
        else:alpharev_exp=0.

    return result

多处理部分

def func(tup, data):    #<-------------the function to be run among all 
    a_temp= a[tup[2][0]]

    idx1 = a_temp > a[tup[2][1]]
    idx2= a_temp < b[(tup[2][1], tup[1][1])]

    c_final = c[tup[0][1]][idx1 | idx2]
    data_final= data[idx1 | idx2]

    return (tup[0][0], tup[1][0], *tup[2]), c_final[-1] - data_final[-1]

def setup(a_dict, b_dict, c_dict):    #initialize the shared dictionaries
    global a,b,c
    a,b,c = a_dict, b_dict, c_dict

def main(a_arr, b_arr, c_arr, common_len):
    np.random.seed(0)
    data_array= np.random.normal(loc= 24004, scale=500, size= common_len)

    a_size = a_arr[-1] + 1
    b_size = len(b_arr)
    c_size = len(c_arr)

    loop_combo = product(enumerate(c_arr),
                         enumerate(b_arr),
                         (n_tup for n_tup in product(np.arange(1,a_arr[-1]), a_arr) if n_tup[1] > n_tup[0])
                         )
    result = np.zeros((c_size, b_size, a_size -1 ,a_size), dtype = np.float32) 

    ###################################################
    #This part simulates the heavy-computation in the actual problem

    a= {}
    b= {}
    c= {}

    for i in range(1, a_arr[-1]+1):

        a[i]= transform_array_a(data_array, i)
        if i in a_arr:
            for j in b_arr:
                b[(i,j)]= rt_mean_sq_dev(data_array, a[i], i)/data_array *j


    for i in c_arr:
        c[i]= transform_array_c(data_array, i)

    ###################################################    
    with mp.Pool(processes= mp.cpu_count() - 1,
                 initializer= setup,
                 initargs= [a,b,c]
                 ) as pool:
        mp_res= pool.imap_unordered(partial(func, data= data_array),
                                    loop_combo
                                    )

        for item in mp_res:
            result[item[0]] =item[1]


    return result


if __name__ == '__main__':
    mp.freeze_support()

    a_arr= np.arange(2,44,2)
    b_arr= np.arange(0.4,0.8, 0.20)
    c_arr= np.arange(2,42,10)
    common_len= 440000

    final_res= main(a_arr, b_arr, c_arr, common_len)

出于性能原因,在所有进程之间使用多个共享的“只读”字典来减少冗余计算(在实际问题中,在所有进程之间使用共享字典后,总计算时间减少了40%)。但是,在我的实际问题中使用共享字典后,ram的使用率变得高得离谱。我的6C / 12T Windows计算机的内存使用率从(峰值8.2GB,空闲5.0GB)变为(峰值23.9GB,空闲5.0GB),要获得40%的加速,要付出的代价太高了。] >

在进程之间使用多个共享数据时,不可避免的是高内存使用率吗?为了使我的代码在使用尽可能少的内存的同时尽可能快地执行什么操作?

提前谢谢您


注:我尝试使用imap_unordered()而不是map,因为我听说应该在输入可迭代的输入较大时减少内存使用,但是老实说我看不到ram使用的改进。也许我在这里做错了?


编辑:由于答案中的反馈,我已经更改了代码的繁重计算部分,以使它看起来更虚假,并且类似于实际问题中的计算。

下面的代码是一个人为的示例,它模拟了我遇到的一个实际问题,该问题使用多重处理来加快代码的速度。该代码在Windows 10 64位操作系统,python 3.7.5和ipython 7 ....上运行....

python performance memory multiprocessing parallelism-amdahl
1个回答
2
投票

在Windows中运行的python multiprocessing中处理共享字典时的高内存使用率>>

© www.soinside.com 2019 - 2024. All rights reserved.