我有一个pandas数据框
updates
如下:
streamid,low,high
time
2023-01-10 16:07:36.264,979,1.07331,1.07344
2023-01-10 16:07:36.359,1009,1.07331,1.07338
2023-01-10 16:07:36.444,781,1.07329,1.07341
2023-01-10 16:07:36.464,979,1.07331,1.07344
2023-01-10 16:07:36.470,1191,1.07331,1.0734
2023-01-10 16:07:36.480,1191,1.07333,1.07342
2023-01-10 16:07:36.493,2,1.07332,1.07337
2023-01-10 16:07:36.493,1009,1.07332,1.07338
2023-01-10 16:07:36.494,979,1.07332,1.07345
2023-01-10 16:07:36.494,786,1.07325,1.07332
2023-01-10 16:07:36.494,141,1.07332,1.07337
2023-01-10 16:07:36.496,1263,1.07332,1.07339
2023-01-10 16:07:36.496,818,1.07331,1.07338
2023-01-10 16:07:36.497,786,1.07325,1.07333
2023-01-10 16:07:36.499,844,1.07331,1.07336
2023-01-10 16:07:36.499,1009,1.07332,1.07339
2023-01-10 16:07:36.501,1028,1.07333,1.07337
2023-01-10 16:07:36.503,141,1.07333,1.07338
2023-01-10 16:07:36.504,1009,1.07333,1.0734
2023-01-10 16:07:36.509,1009,1.07333,1.07341
2023-01-10 16:07:36.509,786,1.07327,1.07335
我想在
low
窗口中计算 high
的滚动最大值和 5s
的滚动最小值,但需要注意的是:如果在任何窗口中,有多行具有相同的 streamid
,只有最新行应该考虑一下。
从概念上讲,这应该很简单:我需要做的就是获取每个
5s
的滚动窗口,在 Streamid 上执行 groupby
,在 GroupBy 对象上调用 last()
以获得每个组的正确行,然后在滚动窗口上执行 agg({"low": "max", "high": "min"})
。
在实践中,我发现我无法在滚动窗口上执行
groupby
,因为滚动是分别应用于每一列的。我知道我可以在 method='table'
中使用 rolling
,在 engine='numba', raw=True
中使用 apply
来获取自定义函数中的整个数据帧,但我无法在 numba 函数内进行 groupby。
这是我的循环解决方案,速度非常慢,但给出了正确的答案:
from itertools import islice
lows = []
highs = []
times = []
rolled = past_updates.rolling("5s")
for index, df in enumerate(islice(rolled, None)):
low, high = df.groupby(["streamid"]).last().agg({"low": "max", "high": "min"})
lows.append(low)
highs.append(high)
times.append(df.index[-1])
out_df = pd.DataFrame({
"low": lows, "high": highs, "time": times
}).set_index("time")
这给出了结果:
low,high
time
2023-01-10 16:07:36.264,1.07331,1.07344
2023-01-10 16:07:36.359,1.07331,1.07338
2023-01-10 16:07:36.444,1.07331,1.07338
2023-01-10 16:07:36.464,1.07331,1.07338
2023-01-10 16:07:36.470,1.07331,1.07338
2023-01-10 16:07:36.480,1.07333,1.07338
2023-01-10 16:07:36.493,1.07333,1.07337
2023-01-10 16:07:36.493,1.07333,1.07337
2023-01-10 16:07:36.494,1.07333,1.07337
2023-01-10 16:07:36.494,1.07333,1.07332
2023-01-10 16:07:36.494,1.07333,1.07332
2023-01-10 16:07:36.496,1.07333,1.07332
2023-01-10 16:07:36.496,1.07333,1.07332
2023-01-10 16:07:36.497,1.07333,1.07333 # <-- 1.07332 is dropped due to new update from 786
2023-01-10 16:07:36.499,1.07333,1.07333
2023-01-10 16:07:36.499,1.07333,1.07333
2023-01-10 16:07:36.501,1.07333,1.07333
2023-01-10 16:07:36.503,1.07333,1.07333
2023-01-10 16:07:36.504,1.07333,1.07333
2023-01-10 16:07:36.509,1.07333,1.07333
2023-01-10 16:07:36.509,1.07333,1.07335
不幸的是,大约 600k 行需要 7-8 分钟。我希望能够针对不同的时间戳多次执行此操作。有没有更好的方法,可能可以避免循环?
这个警告很棘手。你想保留重复的streamid吗?如果没有删除 Streamid 上的重复项
df.drop_duplicates(on='streamid',keep='last',inplace=True)
重新分配低点和高点
df.low = df.low.rolling(window="5s").max()
df.high = df.high.rolling(window="5s").min()
如果您需要保留重复项,我不知道是否有不涉及 for 循环的解决方法。