package com.scala.sparkStreaming
import org.apache.spark._
import org.apache.spark.streaming._
object Demo1 {
def main(assdf:Array[String]){
val sc=new SparkContext("local","Stream")
val stream=new StreamingContext(sc,Seconds(2))
val rdd1=stream.textFileStream("D:/My Documents/Desktop/inbound/sse/ssd/").cache()
val mp1= rdd1.flatMap(_.split(","))
print(mp1.count())
stream.start()
stream.awaitTermination()
}
}
我已经运行了它,然后它显示一个异常的
org.apache.spark.streaming.dstream.MappedDStream@6342993220/05/22 18:14:16 ERROR StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:277)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:169)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:517)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:577)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:576)
at com.scala.sparkStreaming.Demo1$.main(Demo1.scala:18)
at com.scala.sparkStreaming.Demo1.main(Demo1.scala)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:277)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:169)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:517)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:577)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:576)
at com.scala.sparkStreaming.Demo1$.main(Demo1.scala:18)
at com.scala.sparkStreaming.Demo1.main(Demo1.scala)
错误信息 "没有注册任何输出操作,所以没有任何东西可以执行 "给出了一个提示,即缺少了一些东西。
你的直接流 rdd1
和 mp1
没有 行动. A flatMap
仅是 转型 其中被Spark懒得评估。这就是为什么 stream.start()
方法会抛出这个Exception。
根据文档,你可以 打印出一份RDD 如下图所示。由于你处理的是一个流,你可以通过RDDs迭代。我已经用Spark 2.4.5版本成功测试了下面的代码。
文档中的 textFileStream
说,它 "监控一个与Hadoop兼容的文件系统,以获取 新文件 并将其作为文本文件读取",所以请确保您添加modify您要读取的文件。而 作业正在运行。
另外,虽然我对Windows上的Spark并不完全熟悉,但你可能需要将目录字符串改为
file://D:\\My Documents\\Desktop\\inbound\\sse\\ssd
下面是Spark Streaming的完整代码示例。
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Main extends App {
val sc=new SparkContext("local[1]","Stream")
val stream=new StreamingContext(sc,Seconds(2))
val rdd1 =stream.textFileStream("file:///path/to/src/main/resources")
val mp1= rdd1.flatMap(_.split(" "))
mp1.foreachRDD(rdd => rdd.collect().foreach(println(_)))
stream.start()
stream.awaitTermination()
}
在Spark 2.4.5版本中 Spark Streaming
已被弃用,我建议大家已经熟悉了一下 Spark Structured Streaming
. 这方面的代码是这样的。
// Structured Streaming
val lines: DataFrame = spark.readStream
.format("text")
.option("path", "file://path/to/src/main/resources")
.option("maxFilesPerTrigger", "1")
.load()
val query = lines.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()