极长行的火花任务如何处理?

问题描述 投票:0回答:1

我正在使用pyspark处理多个日志文件,在该文件中,一条记录被拆分为多行格式,因此我选择了WholeTextFiles来读取数据,然后过滤出我想要的内容。每个文件的大小约为800M,共有4096个文件。但是,火花作业在处理完一些任务后崩溃了,以下是我的配置和代码。

配置:

-num-executors 100--executor-cores 1--executor-memory 30G

核心代码:

file_rdd= sc.wholeTextFiles(inputDir, 2500)
print file_rdd.getNumPartitions()
out_rdd = file_rdd.flatMap(parseFileContent)\
                  .repartition(1000)\
                  .saveAsTextFile(title_outputDir)

在完成300多个任务后崩溃,运行日志显示如下:

crash log我也在这里获得了DAG可视化:

job dag visualization

似乎分区操作导致崩溃,并且阶段1从未启动。顺便说一句,这里是有关处理的任务的更多信息(如果有帮助的话):

tasks running and failure status 1

tasks running and failure status 2

tasks error log

我在hereherehere上找到了一些相关的问题,但它们仅说明高级内存消耗,而不是说明任务的执行方式(相对于文件中的每一行)以及多少每行都需要内存(处理时和处理后)。希望有人可以从这个复杂的概念中帮助我,并且将对如何在pyspark中处理多行格式文件提供任何建议(不是scala,我知道scala可以使用DataFrame来克服此问题,但我对scala并不熟悉) ,谢谢大家!

apache-spark pyspark yarn
1个回答
0
投票

.repartition(1000)将平衡您的spark分区,这是一项昂贵的操作,.coalesce(1000)只会将分区彼此堆叠,而不会为平衡它们而太费力,因此操作成本要低得多。使用.coalesce()将分区数从4096减少到1000,并在需要增加Spark分区数时保存.repartition()。尝试.coaleesce(1000),您最终将在输出中获得1000个文件。

© www.soinside.com 2019 - 2024. All rights reserved.