我有一个大的 CSV(200 GB),我已经在 Dask 中加载并处理/操作了我需要的正确形式。
执行以下行时:
df_final.to_csv(prefix, sep="\t", index=False, header=False, chunksize=5000)
我遇到了以下错误(绝对与编写 CSV 有关,而不是之前的延迟计算)。它在写入 ~25GB 后持续发生:
Traceback (most recent call last):
File "file.py", line 87, in <module>
dask_convert(directory, filename, prefix)
File "file.py", line 34, in dask_convert
df_final.to_csv(prefix, sep="\t", index=False, header=False, chunksize=5000)
File "/software/miniconda3/lib/python3.6/site-packages/dask/dataframe/core.py", line 1465, in to_csv
return to_csv(self, filename, **kwargs)
File "/software/miniconda3/lib/python3.6/site-packages/dask/dataframe/io/csv.py", line 865, in to_csv
delayed(values).compute(**compute_kwargs)
File "/software/miniconda3/lib/python3.6/site-packages/dask/base.py", line 283, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/software/miniconda3/lib/python3.6/site-packages/dask/base.py", line 565, in compute
results = schedule(dsk, keys, **kwargs)
File "/software/miniconda3/lib/python3.6/site-packages/dask/threaded.py", line 84, in get
**kwargs
File "/software/miniconda3/lib/python3.6/site-packages/dask/local.py", line 487, in get_async
raise_exception(exc, tb)
File "/software/miniconda3/lib/python3.6/site-packages/dask/local.py", line 317, in reraise
raise exc
File "/software/miniconda3/lib/python3.6/site-packages/dask/local.py", line 222, in execute_task
result = _execute_task(task, data)
File "/software/miniconda3/lib/python3.6/site-packages/dask/core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/software/miniconda3/lib/python3.6/site-packages/dask/utils.py", line 35, in apply
return func(*args, **kwargs)
File "/software/miniconda3/lib/python3.6/site-packages/dask/dataframe/io/csv.py", line 679, in _write_csv
df.to_csv(f, **kwargs)
File "/software/miniconda3/lib/python3.6/site-packages/fsspec/core.py", line 121, in __exit__
self.close()
File "/software/miniconda3/lib/python3.6/site-packages/fsspec/core.py", line 149, in close
_close(self.fobjects, self.mode)
File "/software/miniconda3/lib/python3.6/site-packages/fsspec/core.py", line 209, in _close
f.close()
OSError: [Errno 5] Input/output error
如您所见,我指定了一个较小的块大小。我还有超过 500GB 的可用内存,所以这也不是问题。
任何关于我能做些什么来解决这个问题的想法都将不胜感激!
dask.dataframe.to_csv
时不要指定chunksize。 Chunksize 不是此函数的参数。然而,dask 将所有额外的参数传递给 pandas.DataFrame.to_csv
,所以这个 arg 被 pandas 解释。所以,你最终会遇到 dask 和 pandas 都试图独立管理块的情况。
相反,只需确保您首先拥有所需的分区大小,然后使用其他参数写入 csv。 Dask 会自动将您的 csv 分成多个文件。