如何减少pandas上聚合多个函数调用的时间计算

问题描述 投票:0回答:1

我正在寻找一种方法来减少熊猫数据帧上多个函数计算的时间计算,即我正在尝试做的事情。 在我的电脑上需要3分钟多。是否存在与带有聚合函数的 pandarallel 等效的函数或与 boost 计算等效的函数。 pandarallel 似乎不适用于聚合。

这是我使用的代码:

import pandas as pd
import numpy as np

from empyrical import (cagr, 
                    annual_volatility,
                    max_drawdown,
)

import warnings
warnings.filterwarnings('ignore')


# Exemple 
N = 10000
mu = 0.1/252
sigma = 0.15/np.sqrt(250)

# Créer un DataFrame vide pour stocker les séries temporelles
date_range = pd.date_range(start='1990-01-01', end='2020-01-01', freq='B')
# Créer un DataFrame vide avec l'index de dates
df = pd.DataFrame(index=date_range)


# Générer les séries temporelles
for i in range(N):
    series = (100+ np.random.normal(mu, sigma, len(date_range)).cumsum())
    df[f"Série {i+1}"] = series

tab =  (df
        .sort_index()
        .aggregate([
            # Date de première/dernière VL
            lambda x: x.first_valid_index().date(),
            lambda x: x.last_valid_index().date(),
            # Mesure de performance 
            ## Perf total
            lambda x: 100*cagr(x[-52:].pct_change(), period='weekly') if x[-52:].isnull().sum() <= 1 else np.nan,
            lambda x: 100*cagr(x[-3*52:].pct_change(), period='weekly'),
            lambda x: 100*cagr(x[-5*52:].pct_change(), period='weekly'),
            # Mesure de risque
            # Volatilité
            lambda x: 100*annual_volatility(x[-52:].pct_change(), period='weekly'),
            lambda x: 100*annual_volatility(x[-3*52:].pct_change(), period='weekly'),
            lambda x: 100*annual_volatility(x[-5*52:].pct_change(), period='weekly'),
            ## Max DD 
            lambda x: 100*max_drawdown(x[-52:].pct_change()),
            lambda x: 100*max_drawdown(x[-3*52:].pct_change()),
            lambda x: 100*max_drawdown(x[-5*52:].pct_change()),
            ],)
        .set_axis([
            'Date de début', 'Date de fin',
            'Perf 1 an', 'Perf 3 ans', 'Perf 5 ans', 
            'Volat 1 an', 'Volat 3 ans', 'Volat 5 ans',
            'Max DD 1 an', 'Max DD 3 ans', 'Max DD 5 ans',
            ])
        .T
        .dropna()
        )

tab 

python pandas dataframe aggregate
1个回答
0
投票

您可以尝试使用

multiprocessing.Pool
在单独的进程中处理每个
pd.Series

import multiprocessing as mp
import time
import warnings

import numpy as np
import pandas as pd
from empyrical import annual_volatility, cagr, max_drawdown

warnings.filterwarnings("ignore")

agg_funcs = [
    # Date de première/dernière VL
    lambda x: x.first_valid_index().date(),
    lambda x: x.last_valid_index().date(),
    # Mesure de performance
    ## Perf total
    lambda x: (
        100 * cagr(x[-52:].pct_change(), period="weekly")
        if x[-52:].isnull().sum() <= 1
        else np.nan
    ),
    lambda x: 100 * cagr(x[-3 * 52 :].pct_change(), period="weekly"),
    lambda x: 100 * cagr(x[-5 * 52 :].pct_change(), period="weekly"),
    # Mesure de risque
    # Volatilité
    lambda x: 100 * annual_volatility(x[-52:].pct_change(), period="weekly"),
    lambda x: 100 * annual_volatility(x[-3 * 52 :].pct_change(), period="weekly"),
    lambda x: 100 * annual_volatility(x[-5 * 52 :].pct_change(), period="weekly"),
    ## Max DD
    lambda x: 100 * max_drawdown(x[-52:].pct_change()),
    lambda x: 100 * max_drawdown(x[-3 * 52 :].pct_change()),
    lambda x: 100 * max_drawdown(x[-5 * 52 :].pct_change()),
]

agg_labels = [
    "Date de début",
    "Date de fin",
    "Perf 1 an",
    "Perf 3 ans",
    "Perf 5 ans",
    "Volat 1 an",
    "Volat 3 ans",
    "Volat 5 ans",
    "Max DD 1 an",
    "Max DD 3 ans",
    "Max DD 5 ans",
]


def calculate_agg(series):
    name, x = series
    out = pd.Series([fn(x) for fn in agg_funcs], index=agg_labels, name=name)
    return out


if __name__ == "__main__":

    # Exemple
    N = 10000
    # N = 10
    mu = 0.1 / 252
    sigma = 0.15 / np.sqrt(250)

    # Créer un DataFrame vide pour stocker les séries temporelles
    date_range = pd.date_range(start="1990-01-01", end="2020-01-01", freq="B")
    # Créer un DataFrame vide avec l'index de dates
    df = pd.DataFrame(index=date_range)

    # Générer les séries temporelles
    np.random.seed(42)
    for i in range(N):
        series = 100 + np.random.normal(mu, sigma, len(date_range)).cumsum()
        df[f"Série {i+1}"] = series

    start_time = time.time()

    df_out = []
    with mp.Pool(processes=16) as pool:   # <-- adjust number of processes accordingly
        for result in pool.imap_unordered(
            calculate_agg, ((c, df[c]) for c in df.columns)
        ):
            df_out.append(result)

    df_out = pd.concat(df_out, axis=1).T

    end_time = time.time()
    print(f"--- {time.time() - start_time} seconds ---")
    print("Final shape:", df_out.shape)

在我的计算机上打印(AMD 5700x):

--- 5.798572540283203 seconds ---
Final shape: (10000, 11)
© www.soinside.com 2019 - 2024. All rights reserved.