我想在 python 中使用 OpenMP 或 MPI 分散收集并行化“for 循环”迭代。代码如下所示。我也参考了python中的MPI手册(MPI)并从代码开始。
这是我的代码:
def _encode_sample(self, idxes):
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
#print(size, "this is size--num of processes")
# Split the indices into equal parts
local_idxes = np.array_split(idxes, size)[rank]
# Initialize local buffers for the samples
local_obses_t, local_actions, local_rewards, local_obses_tp1, local_dones = [], [], [], [], []
# Collect samples for the local indices
for i in local_idxes:
data = self._storage[i]
obs_t, action, reward, obs_tp1, done = data
local_obses_t.append(obs_t)
local_actions.append(action)
local_rewards.append(reward)
local_obses_tp1.append(obs_tp1)
local_dones.append(done)
# Convert local lists to numpy arrays
local_obses_t = np.array(local_obses_t, dtype=np.float64)
local_actions = np.array(local_actions, dtype=np.float32)
local_rewards = np.array(local_rewards, dtype=np.int64)
local_obses_tp1 = np.array(local_obses_tp1, dtype=np.float64)
local_dones = np.array(local_dones, dtype=np.float32)
# Print out the local numpy arrays
print("Local obses_t:", len(local_obses_t))
print("Local obses_tp1:", len(local_obses_tp1))
# Gather the samples from all processes to the root process
obses_t = comm.gather(local_obses_t, root=0)
actions = comm.gather(local_actions, root=0)
rewards = comm.gather(local_rewards, root=0)
obses_tp1 = comm.gather(local_obses_tp1, root=0)
dones = comm.gather(local_dones, root=0)
# Print out the gathered numpy arrays
if rank == 0:
print("Gathered obses_t:", obses_t)
print("Gathered obses_tp1:", obses_tp1)
# Concatenate the gathered samples into numpy arrays
obses_t = np.concatenate(obses_t, axis=0)
actions = np.concatenate(actions, axis=0)
rewards = np.concatenate(rewards, axis=0)
obses_tp1 = np.concatenate(obses_tp1, axis=0)
dones = np.concatenate(dones, axis=0)
return obses_t, actions, rewards, obses_tp1, dones
原码:
def _encode_sample(self, idxes):
obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
for i in idxes:
data = self._storage[i]
obs_t, action, reward, obs_tp1, done = data
obses_t.append(np.array(obs_t, copy=False))
actions.append(np.array(action, copy=False))
rewards.append(reward)
obses_tp1.append(np.array(obs_tp1, copy=False))
dones.append(done)
return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)
代码中,idxes是1024个随机数的列表。 for循环遍历idxes列表中的每个元素1024次并检索存储在self.storage中的数据并存储信息。
我想做什么:
在我的新实现中,我试图让每个进程在其本地部分 idxes 上工作,将相应的样本收集到本地缓冲区中,并将本地缓冲区转换为 numpy 数组。然后,使用 MPI.Comm.gather 方法将样本从所有进程收集到根进程。最后,收集到的样本在根进程上连接成 numpy 数组并返回。
错误是什么:
当我尝试打印列表时,我观察到 local_obses_t、local_actions、local_rewards、local_obses_tp1、local_dones 中有一些数据,但是不知何故,当我使用 gather 将所有进程的所有样本收集到根进程时,我得到无.
我不明白这个错误背后的原因。有人可以帮忙吗?
我使用的命令:
mpiexec -n 2 python 程序.py
代码适用于mpiexec -n 1 python program.py,但当我将值增加超过 1 时不起作用。
任何帮助将不胜感激。