在 Spark 中导入 parquet 文件时出现内存问题

问题描述 投票:0回答:2

我正在尝试从 Scala Spark (1.5) 中的 parquet 文件查询数据,包括 200 万行的查询(以下代码中的“变体”)。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)  
sqlContext.sql("SET spark.sql.parquet.binaryAsString=true")

val parquetFile = sqlContext.read.parquet(<path>)

parquetFile.registerTempTable("tmpTable")
sqlContext.cacheTable("tmpTable")

val patients = sqlContext.sql("SELECT DISTINCT patient FROM tmpTable ...)

val variants = sqlContext.sql("SELECT DISTINCT ... FROM tmpTable ... )

当获取的行数较低时,此方法运行良好,但当请求大量数据时,会失败并出现“大小超过 Integer.MAX_VALUE”错误。 错误如下:

User class threw exception: org.apache.spark.SparkException:
Job aborted due to stage failure: Task 43 in stage 1.0 failed 4 times,
most recent failure: Lost task 43.3 in stage 1.0 (TID 123, node009):
java.lang.RuntimeException: java.lang.IllegalArgumentException:
Size exceeds Integer.MAX_VALUE at
sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at ...

我该怎么做才能让这项工作成功?

这看起来像是一个内存问题,但我尝试使用最多 100 个执行程序,没有任何区别(无论涉及多少执行程序,失败所需的时间也保持不变)。感觉数据没有跨节点分区?

我试图通过天真地替换这一行来强制更高的并行化,但无济于事:

val variants = sqlContext.sql("SELECT DISTINCT ... FROM tmpTable ... ).repartition(sc.defaultParallelism*10)
scala apache-spark apache-spark-sql parquet
2个回答
7
投票

我不认为这个问题是镶木地板特有的。您“遇到”了 Spark 分区最大大小的限制。

大小超过 Integer.MAX_VALUE sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) 在...

Integer.MAX_VALUE 检测到您的分区大小(我相信)超过 2GB(需要超过 int32 来为其建立索引)。

乔·威登的评论很到位。您甚至需要对数据进行重新分区。尝试 1000 或更多。

例如,

val data = sqlContext.read.parquet("data.parquet").rdd.repartition(1000).toDF

0
投票

您还可以尝试使用以下 Spark 配置来限制分区大小:

spark.sql.files.maxPartitionBytes = <Size in bytes> (Default 128 MB)

根据https://spark.apache.org/docs/latest/sql-performance-tuning.html

读取文件时打包到单个分区的最大字节数。此配置仅在使用基于文件的源(例如 Parquet、JSON 和 ORC)时有效。

这可确保在任何给定时间节点的每个核心仅在指定的卷上运行。 Spark 将根据此值创建适当数量的分区,以提高程序的并行性。 将此值设置为

0.25 * memory per core

是一个很好的经验法则

例如:

If your node has 16 cores, 32 GB mem. 
mem_per_core = 32/16 = ~2 GB (ignoring the overhead)
max_partition_mb = 0.25 * (2 * 1024) = 512 MBs
© www.soinside.com 2019 - 2024. All rights reserved.