如何在 Spark Streaming 作业中查找数据帧的大小

问题描述 投票:0回答:2

我试图找到每批 Spark Streaming 作业中数据帧的大小。我能够成功地找到批处理作业的大小,但是当涉及到流式传输时,我无法做到这一点。

我一直在 databricks 上开发 Spark 应用程序,并在流作业中尝试了“df.queryExecution.optimizedPlan.stats.sizeInBytes”。 但出现以下异常: 流式源的查询必须使用 writeStream.start();;

执行

我尝试将“df.queryExecution.optimizedPlan.stats.sizeInBytes”放入 forEachBatch() 函数中:

data.writeStream.foreachBatch { (df: DataFrame, batchId: Long) =>
df.persist() 
println("The size of the read is : " + df.queryExecution.optimizedPlan.stats.sizeInBytes)                              
}.start.option("checkpointLocation", outpath + "/_checkpoint") 

但这将创建一个新的流,由于一些限制,我们需要避免它。

val data = spark.readStream
                .format("kafka") 
                .option(....)
                .load()

println("The size of the read is : " + data.queryExecution.optimizedPlan.stats.sizeInBytes) 

是否有任何 hack 或任何 api 调用可以在不使用“forEachBatch()”或不创建新流的情况下返回流中数据帧的大小?

python scala apache-spark spark-structured-streaming
2个回答
0
投票

您可以尝试关注

进口量少

import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd
import org.apache.spark.util.SizeEstimator

计算RDD的大小

def calcRDDSize(rdd: RDD[String]): Long = {
    rdd.map(_.getBytes("UTF-8").length.toLong)
    .reduce(_+_) //add the sizes together
}

尺寸计算方式为

val rdd1 = df.rdd.map(_.toString())
calcRDDSize(rdd1)

其中 df 是您的数据框。它将估计大小(以字节为单位)。

希望这有帮助:)


0
投票

无法发表评论,所以在这里询问你是否找到了解决方案?

© www.soinside.com 2019 - 2024. All rights reserved.