如何在Python中重新采样OHLC数据帧而不窥探未来?

问题描述 投票:0回答:1

我有一个 Pandas DataFrame,具有一秒频率的日期时间索引和“开盘价”、“最高价”、“最低价”、“收盘价”列,代表金融工具的价格。

我想将此 DataFrame 重新采样到 15 分钟(或任何频率),但不展望未来,仍然保留原始 DataFrame 的频率为一秒,但为每个蜡烛添加四个新列。目标是实时展示蜡烛是如何形成的。

例如,对于 15 分钟的蜡烛,我将在原始 DataFrame 中添加四个新列,名为“Open_15m”、“High_15m”、“Low_15m”、“Close_15m”,这将每秒更新值作为滚动 OHLC。

请记住,15 分钟蜡烛只能在 hh:00:00 或 hh:15:00 hh:30:00 或 hh:45:00 开始。这意味着,如果我们的 DataFrame 在时间 09:00:00 开始,我们将 OHLC 从 09:00:00 滚动到 09:15:00,然后我们重置并重新开始,因为新的 15 分钟蜡烛在 09:15 开始形成:00.

我想出了执行此操作的代码,我认为它是正确的,但对于具有数百万行的 DataFrame 来说,它“太慢”了。如果代码正确,则需要使用 Numpy 和 Numba 等以某种方式加速。 # Function to find the nearest 15-minute floor def nearest_quarter_hour(timestamp): return timestamp.floor('15T') # Find the nearest 15-minute floor for each timestamp df['15_min_floor'] = df.index.map(nearest_quarter_hour) # Group by the nearest 15-minute floor and calculate rolling OHLC rolling_df = df.groupby('15_min_floor').rolling(window='15T').agg({ 'Open': lambda x: x.iloc[0], # First value in the window 'High': 'max', 'Low': 'min', 'Close': lambda x: x.iloc[-1] # Last value in the window }).reset_index(level=0, drop=True) # add _15 to each column rolling df rolling_df.columns = [f'{col}_15' for col in rolling_df.columns] # Merge with original DataFrame result_df = pd.concat([df, rolling_df], axis=1)


python pandas dataframe finance numba
1个回答
0
投票
numba

版本,可以按照您的方式计算 OHLC,速度明显更快:from numba import njit @njit def compute_ohlc(floor_15_min, O, H, L, C, O_out, H_out, L_out, C_out): first, curr_max, curr_min, last = O[0], H[0], L[0], C[0] last_v = floor_15_min[0] for i, v in enumerate(floor_15_min): if v != last_v: first, curr_max, curr_min, last = O[i], H[i], L[i], C[i] last_v = v else: curr_max = max(curr_max, H[i]) curr_min = min(curr_min, L[i]) last = C[i] O_out[i] = first H_out[i] = curr_max L_out[i] = curr_min C_out[i] = last def compute_numba(df): df["15_min_floor_2"] = df.index.floor("15 min") df[["Open_15_2", "High_15_2", "Low_15_2", "Close_15_2"]] = np.nan compute_ohlc( df["15_min_floor_2"].values, df["Open"].values, df["High"].values, df["Low"].values, df["Close"].values, df["Open_15_2"].values, df["High_15_2"].values, df["Low_15_2"].values, df["Close_15_2"].values, ) compute_ohlc(df)

随机 
df

的基准,有 432001 行:

from timeit import timeit

import pandas as pd
from numba import njit


# generate some random data:

np.random.seed(42)

idx = pd.date_range("1-1-2023", "1-6-2023", freq="1000ms")
df = pd.DataFrame(
    {
        "Open": 50 + np.random.random(len(idx)) * 100,
        "High": 50 + np.random.random(len(idx)) * 100,
        "Low": 50 + np.random.random(len(idx)) * 100,
        "Close": 50 + np.random.random(len(idx)) * 100,
    },
    index=idx,
)


def get_result_df(df):
    def nearest_quarter_hour(timestamp):
        return timestamp.floor("15min")

    # Find the nearest 15-minute floor for each timestamp
    df["15_min_floor"] = df.index.map(nearest_quarter_hour)

    # Group by the nearest 15-minute floor and calculate rolling OHLC
    rolling_df = (
        df.groupby("15_min_floor")
        .rolling(window="15min")
        .agg(
            {
                "Open": lambda x: x.iloc[0],  # First value in the window
                "High": "max",
                "Low": "min",
                "Close": lambda x: x.iloc[-1],  # Last value in the window
            }
        )
        .reset_index(level=0, drop=True)
    )

    # add _15 to each column rolling df
    rolling_df.columns = [f"{col}_15" for col in rolling_df.columns]

    # Merge with original DataFrame
    result_df = pd.concat([df, rolling_df], axis=1)

    return result_df


@njit
def compute_ohlc(floor_15_min, O, H, L, C, O_out, H_out, L_out, C_out):
    first, curr_max, curr_min, last = O[0], H[0], L[0], C[0]

    last_v = floor_15_min[0]
    for i, v in enumerate(floor_15_min):
        if v != last_v:
            first, curr_max, curr_min, last = O[i], H[i], L[i], C[i]
            last_v = v
        else:
            curr_max = max(curr_max, H[i])
            curr_min = min(curr_min, L[i])
            last = C[i]

        O_out[i] = first
        H_out[i] = curr_max
        L_out[i] = curr_min
        C_out[i] = last


def compute_numba(df):
    df["15_min_floor_2"] = df.index.floor("15 min")
    df[["Open_15_2", "High_15_2", "Low_15_2", "Close_15_2"]] = np.nan

    compute_ohlc(
        df["15_min_floor_2"].values,
        df["Open"].values,
        df["High"].values,
        df["Low"].values,
        df["Close"].values,
        df["Open_15_2"].values,
        df["High_15_2"].values,
        df["Low_15_2"].values,
        df["Close_15_2"].values,
    )


t1 = timeit("get_result_df(df)", number=1, globals=globals())
t2 = timeit("compute_numba(df)", number=1, globals=globals())

print(f"Time normal = {t1}")
print(f"Time numba =  {t2}")

在我的计算机 AMD 5700x 上打印:

Time normal = 29.57983471499756 Time numba = 0.2751060768496245

© www.soinside.com 2019 - 2024. All rights reserved.