假设我有一个像“oscar2301”这样非常大的数据集。
我有一个 Apache Beam 管道,我想传递 HuggingFace 数据集。当我尝试将数据集传递给
beam.Create
时,代码往往会将整个数据集加载到内存中,从而导致内存越界错误。
众所周知,大型数据集通常存储在具有多个
.arrow
文件的缓存目录中。
我想了多种方法来处理数据集,既不使机器内存过载,又实现了流式处理方式来节省内存。
pipeline_args = PipelineOptions(
runner='FlinkRunner', # running locally
streaming=True,
)
with beam.Pipeline(options=pipeline_args) as pipeline:
preprocessed = (pipeline
| beam.Create(dataset['train']) #beam.Create(load_huggingface_data()) # parquet_files #ReadFromGenerator() #beam.Create(load_huggingface_data()) # (dataset['train'])
| beam.Map(prepare_oscar2301_for_c4)
#| beam.Map(prepare_mini_dataset_for_c4)
)
# Pass preprocessed rows to rest of pipeline
pages = preprocessed
deduped_pages = c4_utils.remove_duplicate_text(pages)
# Encode deduped_pages
encoded = deduped_pages | beam.Map(lambda page: json.dumps(page.__dict__, ensure_ascii=False).encode('utf-8'))
# Rest of pipeline
encoded | 'Write to JSONL' >> beam.io.WriteToText('/content/output.jsonl')
pipeline.run()
上面的代码往往会填满几乎 52GB 的 RAM,然后在卡在
eam.Create(dataset['train'])
近一个小时后耗尽内存。
是否有一种简单的方法可以以流式方式一次有效地加载和处理数据集一个元素,以避免内存不足错误?
一种选择是将数据集写入文件,然后使用 TextIO 加载该文件 - https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html - 这将处理每一行单独地并且不会尝试将所有内容立即加载到内存中。如果您最终想远程运行它,您可以将文件上传到远程文件系统(例如 gcs)或自定义容器(不过看起来这次运行是本地的)。
沿着beam.Create 的任何内容都必须将所有内容加载到内存中,因为它加载了值列表,因此可能没有一个很好的方法来实现这一点。