例如,我在带有Spark 2.4.4的大窗口中使用Window函数,>
Window .partitionBy("id") .orderBy("timestamp")
在我的测试中,我有大约70个不同的ID,但我可能有大约20万行ID。在没有进一步配置的情况下,我必须为执行程序分配大量内存,以避免此OOM:
org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384 bytes of memory, got 0 at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157) at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98) at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:128) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:161) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:128) at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.add(ExternalAppendOnlyUnsafeRowArray.scala:115) at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.fetchNextPartition(WindowExec.scala:345) at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.next(WindowExec.scala:371) at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.next(WindowExec.scala:303) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$1.hasNext(WholeStageCodegenExec.scala:631) at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.fetchNextRow(WindowExec.scala:314) at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.<init>(WindowExec.scala:323) at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11.apply(WindowExec.scala:303) at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11.apply(WindowExec.scala:302) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
查看源代码后,我发现了此参数,该参数根本没有记录:
spark.sql.windowExec.buffer.in.memory.threshold
给它大尺寸(例如1.000.000),我不再需要太多内存。据我了解,这是缓冲的行数。我猜想增加此参数不会重复执行程序内存中的行,但这对我来说并不是很清楚。
有人可以向我确切说明执行者端如何处理窗口吗?为什么要重复数据?如何避免这种重复并使进程更快,每个窗口中有很多行?可以使用哪些参数?
Thx。
例如,我将Window函数用于带有Spark 2.4.4的大窗口。 Window .partitionBy(“ id”).orderBy(“ timestamp”)在我的测试中,我有大约70个不同的ID,但我可能有大约20万行ID。 ...
我发现了这个参数,根本没有记录: