我正在尝试找到一种可靠的方式来以编程方式计算Spark数据帧的大小(以字节为单位)。
原因是我想拥有一种计算“最佳”数量的分区的方法(“最佳”在这里可能意味着不同的东西:在写入Parquet表时可能表示having an optimal partition size或resulting in an optimal file size-但是两者都可以假定是数据帧大小的某种线性函数。换句话说,我想在数据帧上调用coalesce(n)
或repartition(n)
,其中n
不是固定数字,而是数据帧大小的函数。
关于SO的其他主题建议使用SizeEstimator.estimate
中的org.apache.spark.util
来获取数据帧的字节大小,但是我得到的结果不一致。
首先,我将数据帧持久存储到内存中:
df.cache().count
Spark UI在“存储”选项卡中显示4.8GB的大小。然后,我运行以下命令从SizeEstimator
中获取大小:
import org.apache.spark.util.SizeEstimator
SizeEstimator.estimate(df)
这将得到115'715'808字节=〜116MB。但是,将SizeEstimator
应用于不同的对象会导致非常不同的结果。例如,我尝试分别计算数据帧中每一行的大小并求和:
df.map(row => SizeEstimator.estimate(row.asInstanceOf[ AnyRef ])).reduce(_+_)
这导致大小为12'084'698'256字节=〜12GB。或者,我可以尝试将SizeEstimator
应用于每个分区:
df.mapPartitions(
iterator => Seq(SizeEstimator.estimate(
iterator.toList.map(row => row.asInstanceOf[ AnyRef ]))).toIterator
).reduce(_+_)
再次导致10'792'965'376字节的不同大小=〜10.8GB。
我知道其中涉及内存优化/内存开销,但是执行这些测试后,我看不到如何使用SizeEstimator
来获得足够好的数据帧大小(以及分区大小或结果)的估计值实木复合地板文件大小)。
为了获得对数据帧大小或其分区的良好估计,应用SizeEstimator
的适当方法是什么(如果有的话)?如果没有,这里建议的方法是什么?
[不幸的是,我无法从SizeEstimator
获得可靠的估计,但是我可以找到另一种策略-如果数据帧已缓存,我们可以如下从queryExecution
中提取其大小:
df.cache.foreach(_=>_)
val catalyst_plan = df.queryExecution.logical
val df_size_in_bytes = spark.sessionState.executePlan(
catalyst_plan).optimizedPlan.stats.sizeInBytes
对于示例数据帧,这恰好提供了4.8GB(这也对应于写入未压缩的Parquet表时的文件大小)。
这具有需要缓存数据帧的缺点,但就我而言这不是问题。
SizeEstimator
返回对象在JVM堆上占用的字节数。这包括对象所引用的对象,实际对象的大小几乎总是很小。
您观察到的大小差异是因为当您在JVM上创建新对象时,引用也占用了内存,并且正在对此进行计数。
在这里查看文档🤩https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.util.SizeEstimator$
除了您已经尝试过的大小估计器(很好的见识。。]
下面是另一个选项
RDDInfo[] getRDDStorageInfo()
返回有关哪些RDD是已缓存,如果它们在mem中或同时在这两者上,返回它们占用了多少空间等信息
实际上火花存储选项卡使用此。Spark docs
/**
* :: DeveloperApi ::
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
* they take, etc.
*/
@DeveloperApi
def getRDDStorageInfo: Array[RDDInfo] = {
getRDDStorageInfo(_ => true)
}
private[spark] def getRDDStorageInfo(filter: RDD[_] => Boolean): Array[RDDInfo] = {
assertNotStopped()
val rddInfos = persistentRdds.values.filter(filter).map(RDDInfo.fromRdd).toArray
rddInfos.foreach { rddInfo =>
val rddId = rddInfo.id
val rddStorageInfo = statusStore.asOption(statusStore.rdd(rddId))
rddInfo.numCachedPartitions = rddStorageInfo.map(_.numCachedPartitions).getOrElse(0)
rddInfo.memSize = rddStorageInfo.map(_.memoryUsed).getOrElse(0L)
rddInfo.diskSize = rddStorageInfo.map(_.diskUsed).getOrElse(0L)
}
rddInfos.filter(_.isCached)
}
RDD中的yourRDD.toDebugString
也使用此。代码here
我认为,要获取每个分区中的最佳记录数并检查您的分区是否正确并且它们是均匀分布的,我建议尝试如下操作...并调整您的分区数。然后测量分区的大小...会更明智。解决这种problems
yourdf.rdd.mapPartitionsWithIndex{case (index,rows) => Iterator((index,rows.size))}
.toDF("PartitionNumber","NumberOfRecordsPerPartition")
.show
或现有的Spark功能(基于Spark版本)
import org.apache.spark.sql.functions._
df.withColumn("partitionId", sparkPartitionId()).groupBy("partitionId").count.show
@@ hiryu如果我们正在读取整个数据集/表,则从计划中获取数据框大小的解决方案将起作用。但是它似乎忽略了谓词。
例如,我有一个具有过滤条件的查询,其输出存储在数据框中。当我计算数据框的大小时,它给出了总表大小,而不是与谓词匹配的几个分区大小。