重新采样巨大的 Pandas Dataframe 会抛出 ArrayMemoryError

问题描述 投票:0回答:1

在工作中,我们有一台机器,它正在记录一些传感器值(例如温度、压力、速度等)。我有一个 csv 文件,其中包含这些值,大约有 6 到 7 百万行和 13 列。但是,记录器不会在每一行中写入每个传感器。对于每个传感器,只有第一行和最后一行有一个值。之间的线确实有很多 NaN,并且线之间的时间步长不是恒定的。这里有几行给你一个印象:

2017-04-01 00:00:00.000,56.5611838935852,448,80.12215897219313,448,37.12826156616211,448,19.152511596679688,448,6516.8 85666666667,448,2.9019691567718984,448
2017-04-01 00:00:00.343,,,,,,,,,6517,704,,
2017-04-01 00:00:00.687,,,,,,,,,,,2.9009628295898438,704
2017-04-01 00:00:02.670,,,,,,,19.152511596679688,192,,,,
2017-04-01 00:00:03.343,,,,,,,,,6524,192,,
2017-04-01 00:00:03.453,56.55547332763672,192,,,,,,,,,,

我们希望使用这些值来进行动态模拟,以检查一切是否运行正常。我们使用的工具需要恒定的时间步长,并且在每个步骤中,所有值都必须作为输入给出。

因此,为了解决这个问题,我将csv读取为pandas数据帧,重新采样到1ms(因为csv中的时间戳具有毫秒精度),插入缺失值,然后重新采样到1s,这是所需的时间步长我们的动态模拟并保持较低的内存和磁盘空间要求。毫不奇怪,有了这么多数据,我在重采样到 1 毫秒的步骤中就已经耗尽了内存。

我确实阅读了有关“缩放到大型数据集”的 pandas 用户指南,并取得了一些成功。我加载了更少的列,使用了高效的数据类型并使用了分块。只有通过分块,我才能处理整个数据帧,但花了大约 1.5 小时,这在生产环境中是不可接受的。由于块是在 for 循环中处理的,因此处理是串行的。我阅读了有关 Dask 的部分,并认为并行化可以加快处理速度,但不知怎的,我又遇到了内存问题。 这是我尝试过的:

raw_df = pandas_df upsampled_df = raw_df.resample("1ms").asfreq() dask_df = dask.dataframe.from_pandas(upsampled_df, chunksize=chunk_size) interpolated_df = dask_df.map_partitions(lambda df: df.interpolate()).compute() downsampled_df = interpolated_df.resample("1s").asfreq()

这是 
upsampled_df = raw_df.resample("1ms").asfreq()

抛出的错误消息:


numpy.core._exceptions._ArrayMemoryError:无法为形状为(7862399001)和数据类型为int64的数组分配58.6 GiB

问题是(我认为)我仍然必须使用 pandas 来重新采样数据帧,因为 Dask 没有提供任何与返回重新采样数据帧的
asfreq()

函数等效的功能。

dask_df.resample("1ms")
确实有效,但返回一个重采样器对象,之后我无法对其进行插值。
您将如何处理这个问题?我知道,特别是在科学领域,需要处理大量数据帧是很常见的,但我确实在互联网上找不到任何有用的东西来解决我的问题。

请记住,我既不是 pandas,也不是 python 专业人士。所以我很有可能错过了一些明显的东西。

python pandas dataframe dask
1个回答
0
投票

如果我理解正确的话,如果我们按天分割数据并具有毫秒精度,则误差可以忽略不计

例如,开始 9 天的日期范围

import pandas as pd index = pd.date_range('1/1/2000', periods=9, freq='D') series = pd.Series(range(9), index=index)

对每天的块应用实际的UDF操作。对于每个块(天)。它将重新采样到 1ms(lambda 函数)

series.resample('D').apply(lambda x: x.resample('1ms').asfreq().interpolate().resample('1s').asfreq())

或30分钟

series.resample('30T').apply(lambda x: x.resample('1ms').asfreq().interpolate().resample('1s').asfreq())

当我尝试直接重新采样到 1ms 时,我确实遇到了内核死机(内存问题)

series.resample("1ms").asfreq()

    

© www.soinside.com 2019 - 2024. All rights reserved.