Sypder的多个控制台比多处理要快?

问题描述 投票:0回答:1

我正在对一种定义为一类的交易策略进行回测。我试图选择参数的最佳组合以输入模型,因此我在给定的期间内进行了多次回测,尝试了不同的组合。这个想法是为了能够选择第一代种群来馈入遗传算法。似乎是多处理的完美工作!

因此,我尝试了很多方法来查看更有效的方法。我打开了10个Spyder控制台(是的,我尝试过),并为每个控制台运行了一个参数组合(所有参数同时运行)。

用于每个单个Spyder控制台的示例代码:

class MyStrategy(day,parameters):
   # my strategy that runs on a single day

backtesting=[]
for day in days:
   backtesting_day=MyStrategy(day,single_parameter_combi)
   backtesting.append(backtesting_day)

然后我尝试使用池进行多处理方式。

用于多处理的示例代码:

class MyStrategy(day,parameters):
   # my strategy that runs on a single day

def single_run_backtesting(single_parameter_combi):
   backtesting=[]
   for day in days:
      backtesting_day=MyStrategy(day,single_parameter_combi)
      backtesting.append(backtesting_day)
   return backtesting

def backtest_many(list_of parameter_combinations):
   p=multiprocessing.pool()
   result=p.map(single_run_backtesting,list_of parameter_combinations)
   p.close()
   p.join()
   return result

if __name__ == '__main__':
   parameter_combis=[...] # a list of parameter combinations, 10 different ones in this case
   result = backtest_many(parameter_combis)

我还尝试了以下操作:如下所示,在for循环中打开5个Spyder控制台并运行该类的2个实例,并在单个Spyder控制台中运行具有10个类的实例。

class MyStrategy(day,parameters):
   # my strategy that runs on a single day

parameter_combis=[...] # a list of parameter combinations

backtest_dict={k: [] for k in range(len(parameter_combis)} # make a dictionary of empty lists

for day in days:
   for j,single_parameter_combi in enumerate(parameter_combis):
      backtesting_day=MyStrategy(day,single_parameter_combi)
      backtest_dict[j].append(backtesting_day)

令我惊讶的是,多处理大约一天要花费25分钟,而同一个Spyder控制台在for循环中有10个类的实例大约要花相同的时间,而神奇的是,当我同时运行10个Spyder控制台。如何处理此信息?这对我来说真的没有意义。我正在Windows 10上运行12 CPU的计算机。

[考虑到我计划在96核机器上在AWS上运行某些事情,其中​​约有100种参数组合在遗传算法中交叉运行,而遗传算法应运行约20到30代(完全回测是2个工作月= 44天)。

我的问题是:我想念什么???最重要的是,这仅仅是规模上的差异吗?我知道,例如,如果您定义一个简单的平方函数并连续运行100次,则多处理实际上比for循环要慢。您开始看到大约10000次的优势,例如:https://github.com/vprusso/youtube_tutorials/blob/master/multiprocessing_and_threading/multiprocessing/multiprocessing_pool.py

当多处理多达100种组合时,我会看到性能差异吗?如果是这种情况,是否有任何先进的方法可以知道?我是否正确编写代码?还有其他想法吗?您是否认为如果我要在多个参数的单一参数组合中“多于一个”地使用多步处理,将大大提高速度?

python multiprocessing spyder python-multiprocessing
1个回答
0
投票

展开我的评论“尝试p.imap_unordered()。]:

p.map()确保您以与参数列表中相同的顺序获得结果。为此,一些工人必须闲置一段时间对于您的用例–本质上是对参数组合的网格搜索–您真的不需要按相同的顺序排列它们,而只是想获得最佳选择。 (此外,quoth the documentation,“它可能会导致很长的可迭代对象占用大量内存。请考虑将imap()或imap_unordered()与显式块大小选项一起使用,以提高效率。”]

相比之下,

p.imap_unordered()并不在乎–它只是排队等待工作,工作人员在释放工作时对其进行处理。

[p.imap_unordered()参数也值得尝试-引用chunksize文档,“很长的迭代时间,使用大值的块大小可以使作业完成的速度比使用默认值1快得多。” (因为您花费更少的时间排队和同步事物)。

最后,对于您的特定用例,您可能要考虑让主进程使用生成器函数生成无限数量的参数组合,并在找到足够好的解决方案或足够的时间后中断循环。

一个简单的函数来执行此操作,并遇到一个人为问题(找到两个随机数0..1以使它们的总和最大化)。只要记住也要从worker函数返回原始参数集,否则您将无法访问它! :)

imap()

示例运行:

import random
import multiprocessing
import time


def find_best(*, param_iterable, worker_func, metric_func, max_time, chunksize=10):
    best_result = None
    best_metric = None
    start_time = time.time()
    n_results = 0
    with multiprocessing.Pool() as p:
        for result in p.imap_unordered(worker_func, param_iterable, chunksize=chunksize):
            n_results += 1
            elapsed_time = time.time() - start_time
            metric = metric_func(result)
            if best_metric is None or metric > best_metric:
                print(f'{elapsed_time}: Found new best solution, metric {metric}')
                best_metric = metric
                best_result = result

            if elapsed_time >= max_time:
                print(f'{elapsed_time}: Max time reached.')
                break
    final_time = time.time() - start_time
    print(f'Searched {n_results} results in {final_time} s.')
    return best_result

# ------------

def generate_parameter():
    return {'a': random.random(), 'b': random.random()}


def generate_parameters():
    while True:
        yield generate_parameter()


def my_worker(parameters):
    return {
        'parameters': parameters,  # remember to return this too!
        'value': parameters['a'] + parameters['b'],  # our maximizable metric
    }


def my_metric(result):
    return result['value']


def main():
    result = find_best(
        param_iterable=generate_parameters(),
        worker_func=my_worker,
        metric_func=my_metric,
        max_time=5,
    )
    print(f'Best result: {result}')


if __name__ == '__main__':
    main()
© www.soinside.com 2019 - 2024. All rights reserved.