我正在尝试使用Twitter作为源执行Spark Streaming示例,如下所示:
public static void main (String.. args) {
SparkConf conf = new SparkConf().setAppName("Spark_Streaming_Twitter").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(2));
JavaSQLContext sqlCtx = new JavaSQLContext(sc);
String[] filters = new String[] {"soccer"};
JavaReceiverInputDStream<Status> receiverStream = TwitterUtils.createStream(jssc,filters);
jssc.start();
jssc.awaitTermination();
}
但我得到以下例外
Exception in thread "main" java.lang.AssertionError: assertion failed: No output streams registered, so nothing to execute
at scala.Predef$.assert(Predef.scala:179)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:158)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:416)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:437)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:501)
at org.learning.spark.TwitterStreamSpark.main(TwitterStreamSpark.java:53)
有任何建议如何解决这个问题?
调用输出运算符时,它会触发流的计算。
如果没有DStream上的输出操作符,则不会调用任何计算。基本上你需要在流上调用以下任何方法
print()
foreachRDD(func)
saveAsObjectFiles(prefix, [suffix])
saveAsTextFiles(prefix, [suffix])
saveAsHadoopFiles(prefix, [suffix])
http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations
您也可以先应用任何转换,然后根据需要输出函数。
线程“main”中的异常java.lang.AssertionError:断言失败:没有注册输出流,因此无需执行
TL; DR使用可用的output operators之一,如print
,saveAsTextFiles
或foreachRDD
(或较少使用的saveAsObjectFiles
或saveAsHadoopFiles
)。
换句话说,您必须在代码中的以下行之间使用输出运算符:
JavaReceiverInputDStream<Status> receiverStream = TwitterUtils.createStream(jssc,filters);
// --> The output operator here <--
jssc.start();
引用Spark官方文档的Output Operations on DStreams(突出我的):
输出操作允许将DStream的数据推送到外部系统,如数据库或文件系统。由于输出操作实际上允许外部系统使用转换后的数据,因此它们会触发所有DStream转换的实际执行(类似于RDD的操作)。
关键是没有输出操作符,你“没有注册输出流,所以没有任何东西可以执行”。
正如一位评论者注意到的那样,你必须使用输出转换,例如:在启动print
之前,foreachRDD
或StreamingContext
。
在内部,只要您使用其中一个可用的输出运算符,例如print
或foreach
,DStreamGraph
被要求add an output stream。
你可以在new ForEachDStream is created and registered之后找到注册(这正是add it as an output stream)。
它也 - 完全没有指责这个问题,但真正的原因是来自流输入的滑动窗口持续时间和RDD时间窗口之间的非多个数字。它只记录一个警告:你修复它,上下文停止失败:D