如何计算滚动窗口,然后groupby,然后聚合而不循环?

问题描述 投票:0回答:1

我有一个pandas数据框

updates
如下:

                        streamid,low,high
time
2023-01-10 16:07:36.264,979,1.07331,1.07344
2023-01-10 16:07:36.359,1009,1.07331,1.07338
2023-01-10 16:07:36.444,781,1.07329,1.07341
2023-01-10 16:07:36.464,979,1.07331,1.07344
2023-01-10 16:07:36.470,1191,1.07331,1.0734
2023-01-10 16:07:36.480,1191,1.07333,1.07342
2023-01-10 16:07:36.493,2,1.07332,1.07337
2023-01-10 16:07:36.493,1009,1.07332,1.07338
2023-01-10 16:07:36.494,979,1.07332,1.07345
2023-01-10 16:07:36.494,786,1.07325,1.07332
2023-01-10 16:07:36.494,141,1.07332,1.07337
2023-01-10 16:07:36.496,1263,1.07332,1.07339
2023-01-10 16:07:36.496,818,1.07331,1.07338
2023-01-10 16:07:36.497,786,1.07325,1.07333
2023-01-10 16:07:36.499,844,1.07331,1.07336
2023-01-10 16:07:36.499,1009,1.07332,1.07339
2023-01-10 16:07:36.501,1028,1.07333,1.07337
2023-01-10 16:07:36.503,141,1.07333,1.07338
2023-01-10 16:07:36.504,1009,1.07333,1.0734
2023-01-10 16:07:36.509,1009,1.07333,1.07341
2023-01-10 16:07:36.509,786,1.07327,1.07335

我想在

low
窗口中计算
high
的滚动最大值和
5s
的滚动最小值,但需要注意的是:如果在任何窗口中,有多行具有相同的
streamid
,只有最新行应该考虑一下。

从概念上讲,这应该很简单:我需要做的就是获取每个

5s
的滚动窗口,在 Streamid 上执行
groupby
,在 GroupBy 对象上调用
last()
以获得每个组的正确行,然后在滚动窗口上执行
agg({"low": "max", "high": "min"})

在实践中,我发现我无法在滚动窗口上执行

groupby
,因为滚动是分别应用于每一列的。我知道我可以在
method='table'
中使用
rolling
,在
engine='numba', raw=True
中使用
apply
来获取自定义函数中的整个数据帧,但我无法在 numba 函数内进行 groupby。

这是我的循环解决方案,速度非常慢,但给出了正确的答案:

from itertools import islice
lows = []
highs = []
times = []
rolled = past_updates.rolling("5s")
for index, df in enumerate(islice(rolled, None)):
    low, high = df.groupby(["streamid"]).last().agg({"low": "max", "high": "min"})
    lows.append(low)
    highs.append(high)
    times.append(df.index[-1])
out_df = pd.DataFrame({
    "low": lows, "high": highs, "time": times
}).set_index("time")

这给出了结果:

                        low,high
time
2023-01-10 16:07:36.264,1.07331,1.07344
2023-01-10 16:07:36.359,1.07331,1.07338
2023-01-10 16:07:36.444,1.07331,1.07338
2023-01-10 16:07:36.464,1.07331,1.07338
2023-01-10 16:07:36.470,1.07331,1.07338
2023-01-10 16:07:36.480,1.07333,1.07338
2023-01-10 16:07:36.493,1.07333,1.07337
2023-01-10 16:07:36.493,1.07333,1.07337
2023-01-10 16:07:36.494,1.07333,1.07337
2023-01-10 16:07:36.494,1.07333,1.07332
2023-01-10 16:07:36.494,1.07333,1.07332
2023-01-10 16:07:36.496,1.07333,1.07332
2023-01-10 16:07:36.496,1.07333,1.07332
2023-01-10 16:07:36.497,1.07333,1.07333  # <-- 1.07332 is dropped due to new update from 786
2023-01-10 16:07:36.499,1.07333,1.07333
2023-01-10 16:07:36.499,1.07333,1.07333
2023-01-10 16:07:36.501,1.07333,1.07333
2023-01-10 16:07:36.503,1.07333,1.07333
2023-01-10 16:07:36.504,1.07333,1.07333
2023-01-10 16:07:36.509,1.07333,1.07333
2023-01-10 16:07:36.509,1.07333,1.07335

不幸的是,大约 600k 行需要 7-8 分钟。我希望能够针对不同的时间戳多次执行此操作。有没有更好的方法,可能可以避免循环?

python pandas group-by rolling-computation
1个回答
0
投票

这个警告很棘手。你想保留重复的streamid吗?如果没有删除 Streamid 上的重复项

df.drop_duplicates(on='streamid',keep='last',inplace=True)

重新分配低点和高点

df.low = df.low.rolling(window="5s").max()
df.high = df.high.rolling(window="5s").min()

如果您需要保留重复项,我不知道是否有不涉及 for 循环的解决方法。

© www.soinside.com 2019 - 2024. All rights reserved.