Pyspark在使用大量列保存数据框时遇到问题

问题描述 投票:1回答:1

在Hortonworks集群上通过Jupyter笔记本使用Pyspark 1.6.2处理以下步骤时,我们遇到了一个奇怪的情况:

  1. 从pyspark数据帧中的ORC表读取数据
  2. pivot_columnpivoted_df)转动此表
  3. 在特定选择的pivoted_df上添加一些计算列:calced_df = pivoted_df.select(dependency_list).run_calculations()
  4. 内部加入大表pivoted_df(列> 1.600)和“小”表calced_df(仅约270列)联合所有列
  5. 保存到木地板桌子

(在步骤3中,选择是必要的,否则使用withColumn语句添加一些计算字段需要很长时间。在具有大量列的表上,选择+ Join比withColumn更快)

但是,对于pivot_column变化少于2.500的数据集,工作正常。例如,我们从75.430.000行和1.600列开始成功处理作业。当我们使用另一个包含较少行(50.000)和更多列(2.433)的数据集处理作业时,它也正常工作。

但最后这项工作在最后一步崩溃,因为pivot_column有超过2500种变化(并且只有大约70,000行),并且存在Stackoverflow错误。我们使用一些show()操作调试了单个步骤,以检查作业失败的确切位置。我们发现,一切正常,直到第4步中的Join。因此,连接导致了问题,因为这一步我们得到以下消息:

Py4JJavaError: An error occurred while calling o4445.showString.
: java.lang.StackOverflowError
    at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:24)
    at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.SetBuilder.$plus$plus$eq(SetBuilder.scala:22)
    at scala.collection.TraversableLike$class.to(TraversableLike.scala:629)
    at scala.collection.AbstractTraversable.to(Traversable.scala:105)
    at scala.collection.TraversableOnce$class.toSet(TraversableOnce.scala:267)
    at scala.collection.AbstractTraversable.toSet(Traversable.scala:105)
    at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild$lzycompute(TreeNode.scala:86)
    at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild(TreeNode.scala:86)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:280)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)

但究竟是什么导致了这个错误,我们怎么能避免它呢?

我们当前的Spark配置:

.setMaster("yarn-client") \
.set("spark.executor.instances","10") \
.set("spark.executor.cores","4") \
.set("spark.executor.memory","10g") \
.set("spark.driver.memory","8g") \
.set("spark.yarn.executor.memoryOverhead","1200") \
.set("spark.sql.pivotMaxValues", "6000") \
.set("spark.sql.inMemoryColumnarStorage.batchSize", "1000")

提前谢谢了!

apache-spark pyspark apache-spark-sql pivot stack-overflow
1个回答
0
投票

在压缩和保存之前,执行程序将保存到镶木地板的数据保存为镶嵌格式(在spark 1.6中 - 在2.x中更改,请参见https://spoddutur.github.io/spark-notes/deep_dive_into_storage_formats.html) - 您可以增加分区数(spark.sql.shuffle.partitions)或执行者的记忆

请注意,python占用了spark执行器的一些内存(spark.python.worker.memory设置控制它)

您可以帮助的另一件事是在保存到镶木地板之前对数据进行排序

© www.soinside.com 2019 - 2024. All rights reserved.