为了解析较大的文件,我需要循环写入大量的 parquet 文件。然而,似乎该任务消耗的内存在每次迭代中都会增加,而我希望它保持不变(因为不应在内存中附加任何内容)。这使得扩展变得很棘手。
我添加了一个最小可重现示例,该示例创建 10 000 个镶木地板并循环附加到它。
import resource
import random
import string
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
schema = pa.schema([
pa.field('test', pa.string()),
])
resource.setrlimit(resource.RLIMIT_NOFILE, (1000000, 1000000))
number_files = 10000
number_rows_increment = 1000
number_iterations = 100
writers = [pq.ParquetWriter('test_'+id_generator()+'.parquet', schema) for i in range(number_files)]
for i in range(number_iterations):
for writer in writers:
table_to_write = pa.Table.from_pandas(
pd.DataFrame({'test': [id_generator() for i in range(number_rows_increment)]}),
preserve_index=False,
schema = schema,
nthreads = 1)
table_to_write = table_to_write.replace_schema_metadata(None)
writer.write_table(table_to_write)
print(i)
for writer in writers:
writer.close()
有人知道导致这种泄漏的原因以及如何防止它吗?
我们不确定出了什么问题,但其他一些用户报告了尚未诊断的内存泄漏。我将您的示例添加到跟踪 JIRA 问题之一https://issues.apache.org/jira/browse/ARROW-3324
2022 年更新:
我花了几天时间来解决 pyarrow 的内存泄漏问题。请参阅此处以获得更好的理解。我将把要点贴在下面。基本上,他们说这不是库内存泄漏问题,而是一种常见行为。
Pyarrow 使用 jemalloc,这是一种自定义内存分配器,它会尽力保留从操作系统分配的内存(因为这可能是一项昂贵的操作)。 不幸的是,这使得很难使用
等工具逐行跟踪内存使用情况memory_profiler
。有几个选项:
pyarrow.total_allocated_bytes
来跟踪分配,而不是使用memory_profiler
。import pyarrow as pa
pa.jemalloc_set_decay_ms(0)
您所看到的行为对于 jemalloc 来说非常典型。为了进一步阅读,您还可以查看这些其他问题以获取更多讨论和 jemalloc 行为的示例:
与 @user3503711 类似,我们在 PyArrow 中遇到了严重的内存分配问题。 PyArrow 在 macOS 上使用其
mimalloc
后端而不是 jemalloc
。 PyArrow 为自己分配了 GB 范围内的内存,杀死了我们的并行 pytest 测试运行程序(每个工作线程分配了大量内存,导致 Github Actions 无声地失败)。
我们无法完全摆脱 PyArrow 内存问题,并怀疑存在内部泄漏。不过,我们设法在一定程度上缓解了问题。
pyarrow.parquet.read_table
。您可能想要尝试的一些参数是 read_table(f, use_threads=False pre_buffer=False, memory_map=True)
。pytest
测试库,请确保不在测试装置中使用 Arrow/DataFrame,因为 pytest 可能会在幕后缓存这些对象del
关键字显式删除对象pyarrow
进行垃圾收集 import gc
gc.collect()
import pyarrow
pool = pyarrow.default_memory_pool()
pool.release_unused()
p = psutil.Process()
rss = p.memory_info().rss
print(f"Pool bytes {pool.bytes_allocated():,} max memory {pool.max_memory():,}, RSS after cleaning is {rss:,}")
此外