如何在 python 中使用 MPI 来并行化“for 循环”

问题描述 投票:0回答:0

我想在 python 中使用 OpenMP 或 MPI 分散收集并行化“for 循环”迭代。代码如下所示。我也参考了python中的MPI手册(MPI)并从代码开始。

这是我的代码:

def _encode_sample(self, idxes):
        comm = MPI.COMM_WORLD
        rank = comm.Get_rank()
        size = comm.Get_size()
        #print(size, "this is size--num of processes")

        # Split the indices into equal parts
        local_idxes = np.array_split(idxes, size)[rank]

        # Initialize local buffers for the samples
        local_obses_t, local_actions, local_rewards, local_obses_tp1, local_dones = [], [], [], [], []

        # Collect samples for the local indices
        for i in local_idxes:
            data = self._storage[i]
            obs_t, action, reward, obs_tp1, done = data
            local_obses_t.append(obs_t)
            local_actions.append(action)
            local_rewards.append(reward)
            local_obses_tp1.append(obs_tp1)
            local_dones.append(done)

        # Convert local lists to numpy arrays
        local_obses_t = np.array(local_obses_t, dtype=np.float64)
        local_actions = np.array(local_actions, dtype=np.float32)
        local_rewards = np.array(local_rewards, dtype=np.int64)
        local_obses_tp1 = np.array(local_obses_tp1, dtype=np.float64)
        local_dones = np.array(local_dones, dtype=np.float32)

        # Print out the local numpy arrays
        print("Local obses_t:", len(local_obses_t))
        print("Local obses_tp1:", len(local_obses_tp1))

        # Gather the samples from all processes to the root process
        obses_t = comm.gather(local_obses_t, root=0)
        actions = comm.gather(local_actions, root=0)
        rewards = comm.gather(local_rewards, root=0)
        obses_tp1 = comm.gather(local_obses_tp1, root=0)
        dones = comm.gather(local_dones, root=0)

        # Print out the gathered numpy arrays
        if rank == 0:
            print("Gathered obses_t:", obses_t)
            print("Gathered obses_tp1:", obses_tp1)

            # Concatenate the gathered samples into numpy arrays
            obses_t = np.concatenate(obses_t, axis=0)
            actions = np.concatenate(actions, axis=0)
            rewards = np.concatenate(rewards, axis=0)
            obses_tp1 = np.concatenate(obses_tp1, axis=0)
            dones = np.concatenate(dones, axis=0)


            return obses_t, actions, rewards, obses_tp1, dones

原码:

def _encode_sample(self, idxes):
        obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
        for i in idxes:
            data = self._storage[i]
            obs_t, action, reward, obs_tp1, done = data
            obses_t.append(np.array(obs_t, copy=False))
            actions.append(np.array(action, copy=False))
            rewards.append(reward)
            obses_tp1.append(np.array(obs_tp1, copy=False))
            dones.append(done)
        return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)

代码中,idxes是1024个随机数的列表。 for循环遍历idxes列表中的每个元素1024次并检索存储在self.storage中的数据并存储信息。

我想做什么:

在我的新实现中,我试图让每个进程在其本地部分 idxes 上工作,将相应的样本收集到本地缓冲区中,并将本地缓冲区转换为 numpy 数组。然后,使用 MPI.Comm.gather 方法将样本从所有进程收集到根进程。最后,收集到的样本在根进程上连接成 numpy 数组并返回。

错误是什么:

当我尝试打印列表时,我观察到 local_obses_t、local_actions、local_rewards、local_obses_tp1、local_dones 中有一些数据,但是不知何故,当我使用 gather 将所有进程的所有样本收集到根进程时,我得到.

我不明白这个错误背后的原因。有人可以帮忙吗?

我使用的命令:

mpiexec -n 2 python 程序.py

代码适用于mpiexec -n 1 python program.py,但当我将值增加超过 1 时不起作用。

任何帮助将不胜感激。

python performance parallel-processing mpi openmp
© www.soinside.com 2019 - 2024. All rights reserved.