假设我使用
SparkSession
将大型 CSV 文件(800K 行)转换为 DataFrame,如下所示:
val lines = csv.stripMargin.split('\n')
val rdd = spark.sparkContext.parallelize(lines)
val df = rdd.toDF
此
toDF
调用失败:
Job aborted due to stage failure: Serialized task 180094:0 was 169353207 bytes, which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.
我明白什么意思,不想增加
spark.rpc.message.maxSize
。所以我要计算划分 lines
的分区数,以便每个分区都小于 spark.rpc.message.maxSize
。
val maxSize = ... // "spark.rpc.message.maxSize" in bytes
// Since lines may have different sizes and we don't want to check all lines
// we use maxSize * 0.8 heuristics instead of maxSize to be safe
val numPartitions = math.ceil(csv.getBytes().length / maxSize * 0.8).toInt
val rdd = spark.sparkContext.parallelize(lines, numPartitions)
val df = rdd.toDF
有道理吗?有没有更好的方法来确保分区始终小于
spark.rpc.message.maxSize
?
这确实有意义,因为使用
parallelize
默认情况下会在单个分区中加载传递的列表,除非您传递分区数量!
对于一般规则,您可以选择分区数量为
val maxSize = ... // "spark.rpc.message.maxSize" in bytes
val numberOfCores = ... // "spark.cores.max"
val fileSize = csv.getBytes().length
val partitionSize = math.min(maxSize, fileSize / numberOfCores).toInt
val numPartitions = math.ceil(fileSize / partitionSize).toInt
val df = spark.sparkContext.parallelize(lines, numPartitions).toDF
但是您也可以尝试使用内置的 Spark 函数来读取 CSV,它会自动为您对输入进行分区。
val df = spark.read.csv(path)