如何在 Google Colab 介质中使用 Python/Pyspark 高效地从大数据中过滤和检索特定记录?

问题描述 投票:0回答:1

我正在努力解决数据工程问题:

数据集特征

公共数据集 持续时间 虚拟机总数 数据集大小
Azure公共数据集V2 连续30天 2,695,548(~260万) 235GB(压缩后156GB)[198个文件]

img

情况

我需要读取 195 个

gzip
文件并从时间序列数据中过滤一些感兴趣的
vmid
记录。由于数据收集的质量(所有
vmid
的早期记录都在早期文件中,最新记录存储在最后一个文件中),我需要读取所有文件并根据感兴趣的
进行过滤vmid_lists
。问题是我使用 Google Colab 介质进行分析,它的计算资源有限,如果我天真地打开每个文件并使用 解压缩和连接数据帧,它就会崩溃。这种方法需要更多资源,但目前我没有访问权限。

  • 每个文件约为 800MB,包含 1M 行。
|   timestamp | vmid                                                             |   mincpu |   maxcpu |   avgcpu |
|:-----------:|:-----------------------------------------------------------------|---------:|---------:|---------:|
|           0 | yNf/R3X8fyXkOJm3ihXQcT0F52a8cDWPPRzTT6QFW8N+1QPfeKR5//6xyX0VYn7X |  19.8984 |  24.9964 |  22.6307 |

据我所知,有新的读取\加载包(Spark除外):

假设我想有效地收集和过滤列表中存在的有限

vmid
的记录(例如 ~2.6M 中的 4 个),当然最好的是:

  • 选择那些连续 30 天的数据或
  • 单独收集所有
    vmid
    数据并相应存储它们[例如镶木地板文件]

试用

到目前为止我尝试过的方法如下,这会导致内存不足(OOM)错误或笔记本电脑崩溃:

import dask.dataframe as dd

dfs = []
for i in range(1,196):
  print(i)
  df = dd.read_csv(f"https://azurecloudpublicdataset2.z19.web.core.windows.net/azurepublicdatasetv2/trace_data/vm_cpu_readings/vm_cpu_readings-file-{i}-of-195.csv.gz", blocksize=None)
  df.columns = ['timestamp', 'vmid', 'mincpu', 'maxcpu', 'avgcpu']
  dfs.append(df)

# Combine dataframes
combined_df = dd.concat(dfs)

# Select top 20 vmid
#top_vmid_counts = combined_df["vmid"].value_counts().head(20) # when this not feasible


# How to select continuous time data for selected vmids ---> vmid_lists = ...
vmid_lists = ["yNf/R3X8fyXkOJm3ihXQcT0F52a8cDWPPRzTT6QFW8N+1QPfeKR5//6xyX0VYn7X", #30 days data continuously
              "4WstS6Ub3GzHun4Mzb6BxLldKvkEkws2SZ9tbBV3kfLzOd+QRVETcgqLjtc3mCbD",
              "5f2jDjOhz6v00WonXOAuZW0uPO4OXjf5t64xYvOefcKwb4v7mOQtOZEVebAbiQq7",
              "E3fjqJ4h2SLfvLl9EV6/w9uc8osF0dw9dENCHteoNRLZTp500ezV9RPfyeMdOKfu",
             ]

top_vmid = combined_df[combined_df["vmid"].isin(vmid_lists)]

# Compute the result
result=top_vmid.compute()

假设

我认为考虑到我所解释的情况,建议的解决方案应该具有:

  • 逐个读取所有
    csv.gz
    文件,直到最后一个文件
  • 过滤器(如果你使用Pyspark更好通过
    .cashe()
    兑现桌子)
  • 连接数据帧
  • 存储在每个 vmid 的名称下 [例如
    .parquet
    .csv
    ]
  • 删除
    csv.gz
    以避免崩溃

问题\挑战

那么,在读取\加载阶段高效执行此任务以在默认设置的 Google Colab 介质中检索容易感兴趣的记录的最佳实践是什么?在Colab计划

中寻找一种聪明的方法来解决问题而不改变

任何帮助将不胜感激


Colab笔记本如果有兴趣尝试一下:Open in Colab

我发现的潜在相关帖子:

python pyspark google-colaboratory dask python-polars
1个回答
0
投票

一般来说,随机访问 gzip 压缩文件是不可能的。 (有一些可用的索引技术,但没有通用的解决方案https://github.com/pauldmccarthy/indexed_gzip

Dask 和 pyspark 旨在并行运行所有数据。他们将一次加载一个分区并将其解析为内存中的 pandas 数据帧,然后再执行任何进一步的过滤或聚合。使用 gzip,这意味着每个分区都是一个完整的文件,因此内存不足。

假设您的过滤生成适合内存的数据帧, 对于这种情况,最好的解决方案可能是使用 pandas 分块迭代来流式传输数据,这将流式传输 gzip 文件而不填充内存

it = pd.read_csv(..., iterator=True)
for chunk in it.get_chunk():
    # filter & aggregate or write to file

这个过程甚至可以使用延迟机制(https://docs.dask.org/en/stable/delayed.html)与dask(可能还有spark)并行,一次运行所有文件,因为流式传输单个线程中的所有数据将花费很长时间。

© www.soinside.com 2019 - 2024. All rights reserved.