多处理的数值模拟比希望慢得多:我做错了吗?我可以加快速度吗?

问题描述 投票:3回答:1

我正在运行一组数值模拟。我需要对结果进行一些灵敏度分析,即计算并显示某些输出变化多少,因为某些输入在给定范围内变化。基本上我需要创建一个这样的表,其中每一行是一个模型运行的结果:

+-------------+-------------+-------------+-------------+
|   Input 1   |   Input 2   |  Output 1   |  Output 2   |
+-------------+-------------+-------------+-------------+
| 0.708788979 | 0.614576315 | 0.366315092 | 0.476088865 |
| 0.793662551 | 0.938622754 | 0.898870204 | 0.014915374 |
| 0.366560694 | 0.244354275 | 0.740988568 | 0.197036087 |
+-------------+-------------+-------------+-------------+

每个模型运行都很难并行化,但通过让每个CPU运行具有不同输入的不同模型来并行化并不是太难。

我把多处理库放在一起,但它比我希望的要慢得多。你对我做错了什么/如何加快它有什么建议吗?我愿意使用除多处理之外的库。

它与负载平衡有关吗?我必须承认我是Python中的多处理新手,并且我不太清楚map,apply和apply_async之间的差异。

我做了一个玩具示例来说明我的意思:我从对数正态分布中创建随机样本,并计算我的样本平均值随着分布的均值和西格玛变化而变化的程度。这只是一个平庸的例子,因为这里重要的不是模型本身,而是并行运行多个模型。

在我的例子中,时间(以秒为单位)是:

+-----------------+-----------------+---------------------+
| Million records | Time (parallel) | Time (not parallel) |
+-----------------+-----------------+---------------------+
|               5 | 24.4            | 18                  |
|              10 | 26.5            | 35.8                |
|              20 | 32.2            | 71                  |
+-----------------+-----------------+---------------------+

仅在样本量为5到1000万之间并行化才能带来任何好处。这是预期的吗?

附:我知道用于灵敏度分析的SALib库,但是,据我所知,它没有做我想要的事情。

我的代码:

import numpy as np
import pandas as pd
import time
import multiprocessing
from multiprocessing import Pool

# I store all the possible inputs in a dataframe
tmp = {}
i = 0
for mysigma in np.linspace(0,1,10):
    for mymean in np.linspace(0,1,10):
        i += 1
        tmp[i] = pd.DataFrame({'mean':[mymean],\
           'sigma':[mysigma]})
par_inputs = pd.concat( [tmp[x] for x in tmp], axis=0, ignore_index=True)      


def not_parallel(df):
    for row in df.itertuples(index=True):
        myindex = row[0]
        mymean = row[1]
        mysigma = row[2]
        dist = np.random.lognormal(mymean, mysigma, size = n)
        empmean = dist.mean()
        df.loc[myindex,'empirical mean'] = empmean

    df.to_csv('results not parallel.csv')

# splits the dataframe and sets up the parallelisation
def parallelize_dataframe(df, func):
    df_split = np.array_split(df, num_partitions)
    pool = Pool(num_cores)
    conc_df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    conc_df.to_csv('results parallelized.csv')
    return conc_df

# the actual function being parallelised
def parallel_sensitivities(data):   
    for row in data.itertuples(index=True):
        myindex = row[0]
        mymean = row[1]
        mysigma = row[2]
        dist = np.random.lognormal(mymean, mysigma, size = n)
        empmean = dist.mean()
        print(empmean)
        data.loc[myindex,'empirical mean'] = empmean
    return data


num_cores = multiprocessing.cpu_count()
num_partitions = num_cores
n = int(5e6)

if __name__ == '__main__':

    start = time.time()
    not_parallel(par_inputs)
    time_np = time.time() - start

    start = time.time()
    parallelize_dataframe(par_inputs, parallel_sensitivities)
    time_p = time.time() - start
python pandas multiprocessing
1个回答
1
投票

时间差异用于启动多个进程。要开始每个过程,需要花费一些时间。实际处理时间比非并行更好,但多处理速度提高的一部分是接受启动每个进程所需的时间。

在这种情况下,您的示例函数在几秒钟内相对较快,因此您不会立即在少量记录上看到时间增益。对于每条记录的更密集的操作,您可以通过并行化看到更多的重要时间。

请记住,由于操作系统所需的子进程开销,并行化既昂贵又耗时。与以线性方式运行两个或多个任务相比,并行执行此操作可以节省每个子进程25%到30%的时间,具体取决于您的用例。例如,如果串行执行,则每个消耗5秒的两个任务总共需要10秒,并且在并行化时多平台机器上平均需要大约8秒。这8秒中的3秒可能会丢失到开销,从而限制了您的速度提升。

来自this article.

编辑:

使用Pool()时,您可以选择将任务分配给池。

multiprocessing.apply_asynch() docs用于分配单个任务,以避免在等待任务完成时阻塞。

multiprocessing.map_async docs将通过chunk_size进行迭代,并将每个块添加到池中以完成。

在您的情况下,它将取决于您使用的真实场景,但它们不能基于时间交换,而是基于您需要运行的功能。因为你使用了一个假的例子,我不打算确定你需要哪一个。我猜你可以使用apply_asynch如果你需要运行每个函数并且函数是自包含的。如果函数可以并行运行一个iterable,你会想要map_asynch

© www.soinside.com 2019 - 2024. All rights reserved.