下面的代码是一个人为的示例,它模拟了我遇到的一个实际问题,该问题使用多重处理来加快代码的速度。代码在Windows 10 64-bit OS
,python 3.7.5
和ipython 7.9.0
]上运行
转换函数(这些函数将用于转换main()
中的数组)
from itertools import product from functools import partial from numba import njit, prange import multiprocessing as mp import numpy as np @njit(parallel= True) def transform_array_c(data, n): ar_len= len(data) sec_max1= np.empty(ar_len, dtype = data.dtype) sec_max2= np.empty(ar_len, dtype = data.dtype) for i in prange(n-1): sec_max1[i]= np.nan for sec in prange(ar_len//n): s2_max= data[n*sec+ n-1] s1_max= data[n*sec+ n] for i in range(n-1,-1,-1): if data[n*sec+i] > s2_max: s2_max= data[n*sec+i] sec_max2[n*sec+i]= s2_max sec_max1[n*sec+ n-1]= sec_max2[n*sec] for i in range(n-1): if n*sec+n+i < ar_len: if data[n*sec+n+i] > s1_max: s1_max= data[n*sec+n+i] sec_max1[n*sec+n+i]= max(s1_max, sec_max2[n*sec+i+1]) else: break return sec_max1 @njit(error_model= 'numpy', cache= True) def rt_mean_sq_dev(array1, array2, n): msd_temp = np.empty(array1.shape[0]) K = array2[n-1] rs_x= array1[0] - K rs_xsq = rs_x *rs_x msd_temp[0] = np.nan for i in range(1,n): rs_x += array1[i] - K rs_xsq += np.square(array1[i] - K) msd_temp[i] = np.nan y_i = array2[n-1] - K msd_temp[n-1] = np.sqrt(max(y_i*y_i + (rs_xsq - 2*y_i*rs_x)/n, 0)) for i in range(n, array1.shape[0]): rs_x = array1[i] - array1[i-n]+ rs_x rs_xsq = np.square(array1[i] - K) - np.square(array1[i-n] - K) + rs_xsq y_i = array2[i] - K msd_temp[i] = np.sqrt(max(y_i*y_i + (rs_xsq - 2*y_i*rs_x)/n, 0)) return msd_temp @njit(cache= True) def transform_array_a(data, n): result = np.empty(data.shape[0], dtype= data.dtype) alpharev = 1. - 2 / (n + 1) alpharev_exp = alpharev e = data[0] w = 1. if n == 2: result[0] = e else:result[0] = np.nan for i in range(1, data.shape[0]): w += alpharev_exp e = e*alpharev + data[i] if i > n -3:result[i] = e / w else:result[i] = np.nan if alpharev_exp > 3e-307:alpharev_exp*= alpharev else:alpharev_exp=0. return result
多处理部分
def func(tup, data): #<-------------the function to be run among all a_temp= a[tup[2][0]] idx1 = a_temp > a[tup[2][1]] idx2= a_temp < b[(tup[2][1], tup[1][1])] c_final = c[tup[0][1]][idx1 | idx2] data_final= data[idx1 | idx2] return (tup[0][0], tup[1][0], *tup[2]), c_final[-1] - data_final[-1] def setup(a_dict, b_dict, c_dict): #initialize the shared dictionaries global a,b,c a,b,c = a_dict, b_dict, c_dict def main(a_arr, b_arr, c_arr, common_len): np.random.seed(0) data_array= np.random.normal(loc= 24004, scale=500, size= common_len) a_size = a_arr[-1] + 1 b_size = len(b_arr) c_size = len(c_arr) loop_combo = product(enumerate(c_arr), enumerate(b_arr), (n_tup for n_tup in product(np.arange(1,a_arr[-1]), a_arr) if n_tup[1] > n_tup[0]) ) result = np.zeros((c_size, b_size, a_size -1 ,a_size), dtype = np.float32) ################################################### #This part simulates the heavy-computation in the actual problem a= {} b= {} c= {} for i in range(1, a_arr[-1]+1): a[i]= transform_array_a(data_array, i) if i in a_arr: for j in b_arr: b[(i,j)]= rt_mean_sq_dev(data_array, a[i], i)/data_array *j for i in c_arr: c[i]= transform_array_c(data_array, i) ################################################### with mp.Pool(processes= mp.cpu_count() - 1, initializer= setup, initargs= [a,b,c] ) as pool: mp_res= pool.imap_unordered(partial(func, data= data_array), loop_combo ) for item in mp_res: result[item[0]] =item[1] return result if __name__ == '__main__': mp.freeze_support() a_arr= np.arange(2,44,2) b_arr= np.arange(0.4,0.8, 0.20) c_arr= np.arange(2,42,10) common_len= 440000 final_res= main(a_arr, b_arr, c_arr, common_len)
出于性能原因,在所有进程之间使用多个共享的“只读”字典来减少冗余计算(在实际问题中,在所有进程之间使用共享字典后,总计算时间减少了40%)。但是,在我的实际问题中使用共享字典后,ram的使用率变得高得离谱。我的6C / 12T Windows计算机的内存使用率从(峰值8.2GB,空闲5.0GB)变为(峰值23.9GB,空闲5.0GB),要获得40%的加速,要付出的代价太高了。] >
在进程之间使用多个共享数据时,不可避免的是高内存使用率吗?为了使我的代码在使用尽可能少的内存的同时尽可能快地执行什么操作?
提前谢谢您
注:我尝试使用imap_unordered()
而不是map
,因为我听说应该在输入可迭代的输入较大时减少内存使用,但是老实说我看不到ram使用的改进。也许我在这里做错了?
编辑:由于答案中的反馈,我已经更改了代码的繁重计算部分,以使它看起来更虚假,并且类似于实际问题中的计算。
下面的代码是一个人为的示例,它模拟了我遇到的一个实际问题,该问题使用多重处理来加快代码的速度。该代码在Windows 10 64位操作系统,python 3.7.5和ipython 7 ....上运行....
在Windows中运行的python
multiprocessing
中处理共享字典时的高内存使用率>>