如何确保分区小于maxSize?

问题描述 投票:0回答:1

假设我使用

SparkSession
将大型 CSV 文件(800K 行)转换为 DataFrame,如下所示:

val lines = csv.stripMargin.split('\n')
val rdd = spark.sparkContext.parallelize(lines)
val df = rdd.toDF

toDF
调用失败:

Job aborted due to stage failure: Serialized task 180094:0 was 169353207 bytes, which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.

我明白什么意思,不想增加

spark.rpc.message.maxSize
。所以我要计算划分
lines
的分区数,以便每个分区都小于
spark.rpc.message.maxSize

val maxSize = ... // "spark.rpc.message.maxSize" in bytes

// Since lines may have different sizes and we don't want to check all lines
// we use maxSize * 0.8 heuristics instead of maxSize to be safe
val numPartitions = math.ceil(csv.getBytes().length / maxSize * 0.8).toInt

val rdd = spark.sparkContext.parallelize(lines, numPartitions)
val df = rdd.toDF

有道理吗?有没有更好的方法来确保分区始终小于

spark.rpc.message.maxSize

scala apache-spark
1个回答
0
投票

这确实有意义,因为使用

parallelize
默认情况下会在单个分区中加载传递的列表,除非您传递分区数量!

对于一般规则,您可以选择分区数量为

val maxSize = ... // "spark.rpc.message.maxSize" in bytes
val numberOfCores = ... // "spark.cores.max"
val fileSize = csv.getBytes().length
val partitionSize = math.min(maxSize,  fileSize / numberOfCores).toInt
val numPartitions = math.ceil(fileSize / partitionSize).toInt

val df = spark.sparkContext.parallelize(lines, numPartitions).toDF

但是您也可以尝试使用内置的 Spark 函数来读取 CSV,它会自动为您对输入进行分区。

val df = spark.read.csv(path)
© www.soinside.com 2019 - 2024. All rights reserved.