这是我使用SparkListener编写的代码。我正在使用Spark 2.4.4。
class CustomListener extends SparkListener {
var recordsReadCount = 0L
var recordsWrittenCount = 0L
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
synchronized {
if(taskEnd.taskMetrics.inputMetrics!=None) {
recordsReadCount += taskEnd.taskMetrics.inputMetrics.recordsRead
}
if(taskEnd.taskMetrics.outputMetrics!=None) {
recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten
}
println(s"WRITTEN : $recordsWrittenCount READ : $recordsReadCount")
}
}
}
我获得输入指标的非零结果,但无法获得输出指标的结果。并且[[是,我正在以增量格式写入数据我得到“ WRITTEN:0”作为输出。调用main(sc
为SparkSession
):
val myListener=new CustomListener
sc.sparkContext.addSparkListener(myListener)
// my write operation goes here
sc.sparkContext.removeSparkListener(myListener)