一个循环需要 3 天...我如何并行操作(或矢量化)?

问题描述 投票:0回答:1

我在计算产品的衰减时遇到问题。

问题是我需要计算大量组合才能找到产品的衰减。计算本身很简单,但由于组合较多,计算时间却上天。 我在多索引数据帧上有一个多重嵌套循环,我认为并行化可能会很好 - 但是我制作的并行解决方案只会让它变慢,所以这显然是错误的。

更糟糕的是(及时)我只为 1 个国家/地区创建了循环,并且我还需要将其扩展到其余国家/地区...

循环本身比较简单:

##############################################
#The actual computation
##############################################
import time 
start_time = time.time()
#Loop over gr, ip and year
for gr in np.arange(-2.0, 2.0, 0.5):  # Loop over -2.0, -1.5, ..., 2.0
    for l_s in np.arange(-10.0, 10.0, 1.0):
        for ip in [70, 80, 90]:
            for year in np.arange(1942.0, 2100.0, 1.0):  # Loop over years from 1942.0 to 2099.0
                subs = sales_df.loc[(gr, l_s, ip, year)]['Germany'].values                
                diagonal_values = subs
                result_array = np.zeros((len(diagonal_values) + len(decay) - 1, len(diagonal_values)))

                for i, val in enumerate(diagonal_values):
                    result_array[i:i+len(decay), i] = val * decay
                
                decay_total = result_array.sum(axis=1).round(0)
                decay_total_insert = decay_total[0:300]
                
                # Inserting the decayed values into the DataFrame
                subsetted_df.loc[(gr, l_s, ip, year), 'Germany'] = decay_total_insert
end_time = time.time()
execution_time = end_time - start_time

出于测试目的,我有一些生成的数据看起来像正确的数据(我无法共享)。这段代码可能也可以优化,但这并不重要 - 我有真实的数据。

import numpy as np
import pandas as pd
##############################################
#Creating sample data
##############################################
# Sample input data can be done smarter, but doesn't matter since I have real data.
years = np.arange(1942, 2100)
gstep = np.array([-2, -1.5, -1, -0.5, 0, 0.5, 1, 1.5, 2])
lstep = np.arange(-10, 11)
iprange = np.arange(70, 91, step=10)
countries = ["Germany", "Sweden", "Norway", "Austria", "Belgium", "France"]  # Add 17 more countries here

# Generate all possible combinations of input variables
inputcombs = np.array(np.meshgrid(years, gstep, lstep, iprange)).T.reshape(-1, 4)
input_df = pd.DataFrame(inputcombs, columns=["year", "gr", "lschange", "ip"])

# Generating sample sales data
np.random.seed(42)  # For reproducibility
sdata = np.random.randint(0, 100, size=(input_df.shape[0], len(countries)))
input_df[countries] = sdata

# Set the specified columns as the index
input_df.set_index(["gr", "lschange", "ip", "year"], inplace=True)

# Sorting index by specified columns
input_df.sort_index(level=["gr", "lschange", "ip", "year"], inplace=True)

# Create the additional index "actual_year" ranging from 1943 to 2100
actual_years = np.arange(1943, 2401)
index_levels = input_df.index.levels + [actual_years]
multi_index = pd.MultiIndex.from_product(index_levels, names=input_df.index.names + ["actual_year"])
output_df = pd.DataFrame(index=multi_index, columns=countries)

# Subsetting the DataFrame to get only values that are higher than year in index.
subsetting_condition = (output_df.index.get_level_values('actual_year') >= (output_df.index.get_level_values('year') + 1)) & \
                      (output_df.index.get_level_values('actual_year') <= (output_df.index.get_level_values('year') + 300))
subsetted_df = output_df[subsetting_condition]

# Get the shape of the existing DataFrame
sales_df = subsetted_df.copy()
# Get the shape of the existing DataFrame
rows, cols = sales_df.shape

# Generate an array of random integers between 5 and 1000 with the same shape as the DataFrame
random_integers = np.random.randint(5, 1001, size=(rows, cols))

# Replace the DataFrame values with the random integers
sales_df.values[:] = random_integers

##############################################
#Creating decay vector
##############################################

# Parameters for the log-normal distribution
mean = 0.0  # Mean of the log-normal distribution
std_dev = 1.0  # Standard deviation of the log-normal distribution
size = 86  # Number of elements in the vector

# Generate log-normal values
log_normal_values = np.random.lognormal(mean, std_dev, size)

# Normalize the vector to sum up to 1
decay = log_normal_values / np.sum(log_normal_values)

我尝试了使用 joblib 的并行函数(就像这个 我如何并行化一个简单的 Python 循环?) - 它使它变得更慢,我可以看到其他人也在努力解决这个问题...... 我尝试过矢量化,但我能够进行的量并没有加快速度(同时),所以我回到了循环——如果没有别的办法的话——相对容易理解的过程。

python performance parallel-processing vectorization
1个回答
0
投票

由于迭代之间的依赖关系的性质以及与创建和管理并行线程或进程相关的开销,并行化嵌套循环可能具有挑战性。但是,就您的情况而言,您可以考虑并行化最外层循环并保持内部循环顺序。以下是如何使用并发.futures 模块进行多线程来实现此目的的示例:

import numpy as np
import pandas as pd
from concurrent.futures import ThreadPoolExecutor

# ... (Your data generation code)

# Function to process a single combination
def process_combination(gr, l_s, ip, year):
    subs = sales_df.loc[(gr, l_s, ip, year)]['Germany'].values                
    diagonal_values = subs
    result_array = np.zeros((len(diagonal_values) + len(decay) - 1, len(diagonal_values)))

    for i, val in enumerate(diagonal_values):
        result_array[i:i+len(decay), i] = val * decay

    decay_total = result_array.sum(axis=1).round(0)
    decay_total_insert = decay_total[0:300]

    return (gr, l_s, ip, year), decay_total_insert

# Parallel processing
start_time = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(process_combination, gr, l_s, ip, year) for gr in np.arange(-2.0, 2.0, 0.5)
                                                                             for l_s in np.arange(-10.0, 10.0, 1.0)
                                                                             for ip in [70, 80, 90]
                                                                             for year in np.arange(1942.0, 2100.0, 1.0)]

    for future in futures:
        idx, decay_total_insert = future.result()
        subsetted_df.loc[idx, 'Germany'] = decay_total_insert

end_time = time.time()
execution_time = end_time - start_time

ThreadPoolExecutor 用于并行化最外层循环。内部循环在每个线程内保持顺序。应根据系统上的可用内核调整工作线程数 (max_workers)。你可能有 32 个。

由于 CPython 中的开销和全局解释器锁 (GIL),并行化并不总是能保证加速...

© www.soinside.com 2019 - 2024. All rights reserved.