并行和线性嵌套循环不匹配

问题描述 投票:0回答:1

我想并行化类似于以下代码:

 Ngal=10
 sampind=[7,16,22,31,45]
 samples=0.3*np.ones((60,Ngal))
 zt=[2.15,7.16,1.23,3.05,4.1,2.09,1.324,3.112,0.032,0.2356]
 toavg=[]
 for j in range(Ngal):
     gal=[] 
     for m in sampind:
          gal.append(samples[m][j]-zt[j])
     toavg.append(np.mean(gal))
 accuracy=np.mean(toavg)

所以我听从建议here,我将其重写如下:

toavg=[]
gal=[]
p = mp.Pool()

def deltaz(params):
    j=params[0] # index of the galaxy
    m=params[1] # indices for which we have sampled redshifts
    gal.append(samples[m][j]-zt[j])
    return np.mean(gal)

j=(np.linspace(0,Ngal-1,Ngal).astype(int))
m=sampind
grid=[j,m]
input=itertools.product(*grid)
results = p.map(deltaz,input)
accuracy=np.mean(results)
p.close()
p.join()

但是结果不一样。实际上,有时候是,有时不是。它似乎不是确定性的。我的方法正确吗?如果没有,我该怎么解决?谢谢!复制上述示例所需的模块是:

import numpy as np
import multiprocess as mp
import itertools

谢谢!

python for-loop itertools multiprocess
1个回答
0
投票

我看到的第一个问题是,您正在创建一个全局变量gal,该变量将由deltaz函数访问。但是,这些不是在池进程之间共享的,而是分别为每个进程实例化的。如果要让他们共享此结构,则必须使用共享内存。这可能就是为什么您看到不确定的行为的原因。

下一个问题是您实际上并没有完成具有不同版本的相同任务。您要获取的第一个平均值是每组平均值(gal)的平均值。并行计算是取平均每个元素最终出现在该列表中的平均值。这是不确定的,因为在项目可用时将其分配给流程,而这不一定是可预测的。

我建议并行化内部循环。为此,您需要将zt和sample都放在共享内存中,因为所有进程都可以访问它们。如果您要修改数据,这可能会很危险,但是由于您似乎只是在读取数据,所以应该没问题。

import numpy as np
import multiprocessing as mp
import itertools
import ctypes
#Non-parallel code
Ngal=10
sampind=[7,16,22,31,45]
samples=0.3*np.ones((60,Ngal))
zt=[2.15,7.16,1.23,3.05,4.1,2.09,1.324,3.112,0.032,0.2356]
#Nonparallel
toavg=[]
for j in range(Ngal):
    gal=[]
    for m in sampind:
         gal.append(samples[m][j]-zt[j])
    toavg.append(np.mean(gal))
accuracy=np.mean(toavg)
print(toavg)

# Parallel function
def deltaz(j):
    sampind=[7,16,22,31,45]
    gal = []
    for m in sampind:
         gal.append(samples[m][j]-zt[j])
    return np.mean(gal)
# Shared array for zt
zt_base = mp.Array(ctypes.c_double, int(len(zt)),lock=False)
ztArr = np.ctypeslib.as_array(zt_base)
#Shared array for samples
sample_base = mp.Array(ctypes.c_double, int(np.product(samples.shape)),lock=False)
sampArr = np.ctypeslib.as_array(sample_base)
sampArr = sampArr.reshape(samples.shape)
#Copy arrays to shared
sampArr[:,:] = samples[:,:]
ztArr[:] = zt[:]
with mp.Pool() as p:
    result = p.map(deltaz,(np.linspace(0,Ngal-1,Ngal).astype(int)))
    print(result)

这里是产生相同结果的示例。您可以根据需要添加更多的复杂性,但是我会阅读有关常规的多处理以及内存类型/作用域的知识,以了解什么将起作用,哪些将不起作用。进入多处理世界时,您必须格外小心。让我知道这是否无济于事,我将尝试对其进行更新,以使其起作用。

© www.soinside.com 2019 - 2024. All rights reserved.