Spark / scala中的SQL查询大小超过Integer.MAX_VALUE

问题描述 投票:24回答:1

我试图使用Spark在S3事件上创建一个简单的SQL查询。我正在加载~30GB的JSON文件,如下所示:

val d2 = spark.read.json("s3n://myData/2017/02/01/1234");
d2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK);
d2.registerTempTable("d2");

然后我试着写入我的查询结果的文件:

val users_count = sql("select count(distinct data.user_id) from d2");
users_count.write.format("com.databricks.spark.csv").option("header", "true").save("s3n://myfolder/UsersCount.csv");

但Spark引发了以下异常:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

请注意,相同的查询适用于较小的数据量。这有什么问题?

sql apache-spark amazon-ec2 emr
1个回答
55
投票

没有Spark shuffle块可以大于2GB(Integer.MAX_VALUE字节),因此您需要更多/更小的分区。

您应该调整spark.default.parallelism和spark.sql.shuffle.partitions(默认为200),这样分区数量可以容纳您的数据而不会达到2GB限制(您可以尝试瞄准256MB /分区,因此对于200GB,您可以获得800分区)。成千上万的分区很常见,所以不要害怕按照建议重新分配到1000。

仅供参考,你可以用rdd.getNumPartitions(即d2.rdd.getNumPartitions)来检查RDD的分区数量。

有一个故事要跟踪解决各种2GB限制的努力(现在已开放一段时间):https://issues.apache.org/jira/browse/SPARK-6235

有关此错误的更多信息,请参阅http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications/25

© www.soinside.com 2019 - 2024. All rights reserved.