为什么即使没有新数据,Spark Streaming也会执行foreachRDD?

问题描述 投票:1回答:1

我有以下Spark流示例:

val conf = new SparkConf().setAppName("Name").setMaster("local")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(2))

val directoryStream = ssc.textFileStream("""file:///C:/Users/something/something""")
directoryStream.foreachRDD(file => {
  println(file.count())
})

ssc.start()
ssc.awaitTermination()

即使文件夹为空,它也会每2秒保持打印0,就像文件夹中有空文件一样。我希望它只在文件夹中存在新文件时进入foreachRDD。有什么我做错了吗?

我使用Spark 1.6和Scala 2.10.7。

scala apache-spark spark-streaming
1个回答
4
投票

由于批处理持续时间为2秒,作业将每隔2秒触发一次,基本上触发点不是数据可用性,它是批处理持续时间,如果DStream包含数据时存在的数据,否则它将为空(使用下面的代码来检查相同的)

 dstream.foreachRDD{ rdd => if (!rdd.isEmpty) {// do something } }
© www.soinside.com 2019 - 2024. All rights reserved.