我正在运行一组数值模拟。我需要对结果进行一些灵敏度分析,即计算并显示某些输出变化多少,因为某些输入在给定范围内变化。基本上我需要创建一个这样的表,其中每一行是一个模型运行的结果:
+-------------+-------------+-------------+-------------+
| Input 1 | Input 2 | Output 1 | Output 2 |
+-------------+-------------+-------------+-------------+
| 0.708788979 | 0.614576315 | 0.366315092 | 0.476088865 |
| 0.793662551 | 0.938622754 | 0.898870204 | 0.014915374 |
| 0.366560694 | 0.244354275 | 0.740988568 | 0.197036087 |
+-------------+-------------+-------------+-------------+
每个模型运行都很难并行化,但通过让每个CPU运行具有不同输入的不同模型来并行化并不是太难。
我把多处理库放在一起,但它比我希望的要慢得多。你对我做错了什么/如何加快它有什么建议吗?我愿意使用除多处理之外的库。
它与负载平衡有关吗?我必须承认我是Python中的多处理新手,并且我不太清楚map,apply和apply_async之间的差异。
我做了一个玩具示例来说明我的意思:我从对数正态分布中创建随机样本,并计算我的样本平均值随着分布的均值和西格玛变化而变化的程度。这只是一个平庸的例子,因为这里重要的不是模型本身,而是并行运行多个模型。
在我的例子中,时间(以秒为单位)是:
+-----------------+-----------------+---------------------+
| Million records | Time (parallel) | Time (not parallel) |
+-----------------+-----------------+---------------------+
| 5 | 24.4 | 18 |
| 10 | 26.5 | 35.8 |
| 20 | 32.2 | 71 |
+-----------------+-----------------+---------------------+
仅在样本量为5到1000万之间并行化才能带来任何好处。这是预期的吗?
附:我知道用于灵敏度分析的SALib库,但是,据我所知,它没有做我想要的事情。
我的代码:
import numpy as np
import pandas as pd
import time
import multiprocessing
from multiprocessing import Pool
# I store all the possible inputs in a dataframe
tmp = {}
i = 0
for mysigma in np.linspace(0,1,10):
for mymean in np.linspace(0,1,10):
i += 1
tmp[i] = pd.DataFrame({'mean':[mymean],\
'sigma':[mysigma]})
par_inputs = pd.concat( [tmp[x] for x in tmp], axis=0, ignore_index=True)
def not_parallel(df):
for row in df.itertuples(index=True):
myindex = row[0]
mymean = row[1]
mysigma = row[2]
dist = np.random.lognormal(mymean, mysigma, size = n)
empmean = dist.mean()
df.loc[myindex,'empirical mean'] = empmean
df.to_csv('results not parallel.csv')
# splits the dataframe and sets up the parallelisation
def parallelize_dataframe(df, func):
df_split = np.array_split(df, num_partitions)
pool = Pool(num_cores)
conc_df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
conc_df.to_csv('results parallelized.csv')
return conc_df
# the actual function being parallelised
def parallel_sensitivities(data):
for row in data.itertuples(index=True):
myindex = row[0]
mymean = row[1]
mysigma = row[2]
dist = np.random.lognormal(mymean, mysigma, size = n)
empmean = dist.mean()
print(empmean)
data.loc[myindex,'empirical mean'] = empmean
return data
num_cores = multiprocessing.cpu_count()
num_partitions = num_cores
n = int(5e6)
if __name__ == '__main__':
start = time.time()
not_parallel(par_inputs)
time_np = time.time() - start
start = time.time()
parallelize_dataframe(par_inputs, parallel_sensitivities)
time_p = time.time() - start
时间差异用于启动多个进程。要开始每个过程,需要花费一些时间。实际处理时间比非并行更好,但多处理速度提高的一部分是接受启动每个进程所需的时间。
在这种情况下,您的示例函数在几秒钟内相对较快,因此您不会立即在少量记录上看到时间增益。对于每条记录的更密集的操作,您可以通过并行化看到更多的重要时间。
请记住,由于操作系统所需的子进程开销,并行化既昂贵又耗时。与以线性方式运行两个或多个任务相比,并行执行此操作可以节省每个子进程25%到30%的时间,具体取决于您的用例。例如,如果串行执行,则每个消耗5秒的两个任务总共需要10秒,并且在并行化时多平台机器上平均需要大约8秒。这8秒中的3秒可能会丢失到开销,从而限制了您的速度提升。
编辑:
使用Pool()
时,您可以选择将任务分配给池。
multiprocessing.apply_asynch()
docs用于分配单个任务,以避免在等待任务完成时阻塞。
multiprocessing.map_async
docs将通过chunk_size
进行迭代,并将每个块添加到池中以完成。
在您的情况下,它将取决于您使用的真实场景,但它们不能基于时间交换,而是基于您需要运行的功能。因为你使用了一个假的例子,我不打算确定你需要哪一个。我猜你可以使用apply_asynch
如果你需要运行每个函数并且函数是自包含的。如果函数可以并行运行一个iterable,你会想要map_asynch
。