我正在努力解决数据工程问题:
公共数据集 | 持续时间 | 虚拟机总数 | 数据集大小 |
---|---|---|---|
Azure公共数据集V2 | 连续30天 | 2,695,548(~260万) | 235GB(压缩后156GB)[198个文件] |
我需要读取 195 个
gzip
文件并从时间序列数据中过滤一些感兴趣的 vmid
记录。由于数据收集的质量(所有vmid
的早期记录都在早期文件中,最新记录存储在最后一个文件中),我需要读取所有文件并根据感兴趣的进行过滤vmid_lists
。问题是我使用 Google Colab 介质进行分析,它的计算资源有限,如果我天真地打开每个文件并使用 pandas 解压缩和连接数据帧,它就会崩溃。这种方法需要更多资源,但目前我没有访问权限。
| timestamp | vmid | mincpu | maxcpu | avgcpu |
|:-----------:|:-----------------------------------------------------------------|---------:|---------:|---------:|
| 0 | yNf/R3X8fyXkOJm3ihXQcT0F52a8cDWPPRzTT6QFW8N+1QPfeKR5//6xyX0VYn7X | 19.8984 | 24.9964 | 22.6307 |
据我所知,有新的读取\加载包bigdata(Spark除外):
假设我想有效地收集和过滤列表中存在的有限
vmid
的记录(例如 ~2.6M 中的 4 个),当然最好的是:
vmid
数据并相应存储它们[例如镶木地板文件]到目前为止我尝试过的方法如下,这会导致内存不足(OOM)错误或笔记本电脑崩溃:
import dask.dataframe as dd
dfs = []
for i in range(1,196):
print(i)
df = dd.read_csv(f"https://azurecloudpublicdataset2.z19.web.core.windows.net/azurepublicdatasetv2/trace_data/vm_cpu_readings/vm_cpu_readings-file-{i}-of-195.csv.gz", blocksize=None)
df.columns = ['timestamp', 'vmid', 'mincpu', 'maxcpu', 'avgcpu']
dfs.append(df)
# Combine dataframes
combined_df = dd.concat(dfs)
# Select top 20 vmid
#top_vmid_counts = combined_df["vmid"].value_counts().head(20) # when this not feasible
# How to select continuous time data for selected vmids ---> vmid_lists = ...
vmid_lists = ["yNf/R3X8fyXkOJm3ihXQcT0F52a8cDWPPRzTT6QFW8N+1QPfeKR5//6xyX0VYn7X", #30 days data continuously
"4WstS6Ub3GzHun4Mzb6BxLldKvkEkws2SZ9tbBV3kfLzOd+QRVETcgqLjtc3mCbD",
"5f2jDjOhz6v00WonXOAuZW0uPO4OXjf5t64xYvOefcKwb4v7mOQtOZEVebAbiQq7",
"E3fjqJ4h2SLfvLl9EV6/w9uc8osF0dw9dENCHteoNRLZTp500ezV9RPfyeMdOKfu",
]
top_vmid = combined_df[combined_df["vmid"].isin(vmid_lists)]
# Compute the result
result=top_vmid.compute()
我认为考虑到我所解释的情况,建议的解决方案应该具有:
csv.gz
文件,直到最后一个文件.cashe()
兑现桌子).parquet
或 .csv
]csv.gz
以避免崩溃那么,在读取\加载阶段高效执行此任务以在默认设置的 Google Colab 介质中检索容易感兴趣的记录的最佳实践是什么?在Colab计划
中寻找一种聪明的方法来解决问题而不改变任何帮助将不胜感激
我发现的潜在相关帖子:
一般来说,随机访问 gzip 压缩文件是不可能的。 (有一些可用的索引技术,但没有通用的解决方案https://github.com/pauldmccarthy/indexed_gzip)
Dask 和 pyspark 旨在并行运行所有数据。他们将一次加载一个分区并将其解析为内存中的 pandas 数据帧,然后再执行任何进一步的过滤或聚合。使用 gzip,这意味着每个分区都是一个完整的文件,因此内存不足。
假设您的过滤生成适合内存的数据帧, 对于这种情况,最好的解决方案可能是使用 pandas 分块迭代来流式传输数据,这将流式传输 gzip 文件而不填充内存
it = pd.read_csv(..., iterator=True)
for chunk in it.get_chunk():
# filter & aggregate or write to file
这个过程甚至可以使用延迟机制(https://docs.dask.org/en/stable/delayed.html)与dask(可能还有spark)并行,一次运行所有文件,因为流式传输单个线程中的所有数据将花费很长时间。