带有Window功能的Spark 12 GB数据加载性能问题

问题描述 投票:0回答:2

我使用sparksql转换12 GB数据。我的转换是在一个字段上应用行号和分区,然后将数据分成两组,第一组,其中行号为1,第二组包括其余数据,然后将数据写入目标位于30个分区。

我的工作目前大约需要1个小时。我想在不到10分钟的时间内运行它。

我正在使用规范(16个内核和32 GB RAM)在3节点集群上运行此作业。节点1纱线主节点。节点2两个执行器1驱动程序和另外1个节点3两个执行程序都用于处理。每个执行程序分配5个内核和10GB内存。

我的硬件够用还是需要更强大的硬件?执行人配置是对的吗?如果硬件和配置都很好,那么我肯定需要改进我的代码。

我的代码如下。

sqlContext=SQLContext(sc)

SalesDf = sqlContext.read.options(header='true').load(path, format='csv')
SalesDf.cache()

SalesDf_Version=SalesDf.withColumn('row_number',F.row_number().over(Window.partitionBy("id").orderBy(desc("recorddate"))))

initialLoad = SalesDf_Version.withColumn('partition',SalesDf_Version.year).withColumn('isActive', when(col('row_number') == 1, lit('Y')).when(col('row_number') != 1, lit('N')))
initialLoad = initialLoad.withColumn('version_flag',col ('isActive')).withColumn('partition',col('city'))
initialLoad = initialLoad.drop('row_number')


initialLoad.coalesce(1).write.partitionBy('isActive','partition').option("header", "true").mode('overwrite').csv(path +'Temp/target/')
initialLoad.coalesce(1).write.partitionBy('isActive','partition').option("header", "true").mode('overwrite').csv(path +'Temp/target/')

sc.stop()

在此先感谢您的帮助

apache-spark pyspark hadoop2
2个回答
3
投票

在写作之前你有一个coalesce(1),这是什么原因? Coalesce减少了该阶段的并行化,在您的情况下将导致窗口查询在1个核心上运行,因此您将失去每个节点16个核心的优势。

删除合并,这应该开始改进的东西。


1
投票

以下是我们为提高代码性能而实施的更改。

我们删除了coalesce并使用了重新分区(50)。我们在括号中尝试了更高和更低的数字,但在我们的案例中,50是优化的数字。我们使用s3作为我们的目标,但由于重新命名火花中的东西而花费了我们很多,所以我们使用HDFS而我们的工作时间减少到之前的一半。总的来说,通过上述更改,我们的代码运行了12分钟,之前是50分钟。谢谢Ammar

© www.soinside.com 2019 - 2024. All rights reserved.