我正在使用dask
处理10个文件,每个文件的大小约为142MB。我用延迟标签构建了一个方法,下面是一个示例:
@dask.delayed
def process_one_file(input_file_path, save_path):
res = []
for line in open(input_file_path):
res.append(line)
df = pd.DataFrame(line)
df.to_parquet(save_path+os.path.basename(input_file_path))
if __name__ == '__main__':
client = ClusterClient()
input_dir = ""
save_dir = ""
print("start to process")
cvss = [process_one_file(input_dir+filename, save_dir) for filename in os.listdir(input_dir)]
dask.compute(csvs)
但是,dask
并非总是成功运行。处理完所有文件后,该程序通常会挂起。我使用命令行来运行程序。程序在打印后经常会变黄start to process
。我知道程序可以正常运行,因为一段时间后我可以看到所有输出文件。但是程序永远不会停止。如果禁用了tls,则程序可以成功运行。太奇怪了,如果启用了tls连接,dask无法停止程序。我该如何解决?
[我发现如果添加to_parquet
方法,则程序无法停止,而如果删除该方法,则该程序将成功运行。
我发现了问题。我为每个进程设置了10GB。这意味着我设置了memory-limit=10GB
。我总共设置了2个工人,每个工人都有2个流程。每个进程有2个线程。因此,每台机器将有4个进程,占用40GB。但是,我的机器只有32GB。如果降低内存限制,则程序将成功运行!