从Spark Streaming中获取异常 "没有注册输出操作,所以没有执行"。

问题描述 投票:0回答:1
package com.scala.sparkStreaming

import org.apache.spark._
import org.apache.spark.streaming._

object Demo1 {
  def main(assdf:Array[String]){

     val sc=new SparkContext("local","Stream")

     val stream=new StreamingContext(sc,Seconds(2))

     val rdd1=stream.textFileStream("D:/My Documents/Desktop/inbound/sse/ssd/").cache()

     val mp1= rdd1.flatMap(_.split(","))
     print(mp1.count())

     stream.start()
     stream.awaitTermination()
  }
}

我已经运行了它,然后它显示一个异常的

org.apache.spark.streaming.dstream.MappedDStream@6342993220/05/22 18:14:16 ERROR StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
    at scala.Predef$.require(Predef.scala:277)
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:169)
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:517)
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:577)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:576)
    at com.scala.sparkStreaming.Demo1$.main(Demo1.scala:18)
    at com.scala.sparkStreaming.Demo1.main(Demo1.scala)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
    at scala.Predef$.require(Predef.scala:277)
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:169)
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:517)
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:577)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:576)
    at com.scala.sparkStreaming.Demo1$.main(Demo1.scala:18)
    at com.scala.sparkStreaming.Demo1.main(Demo1.scala)

apache-spark spark-streaming rdd spark-structured-streaming
1个回答
0
投票

错误信息 "没有注册任何输出操作,所以没有任何东西可以执行 "给出了一个提示,即缺少了一些东西。

你的直接流 rdd1mp1 没有 行动. A flatMap 仅是 转型 其中被Spark懒得评估。这就是为什么 stream.start()方法会抛出这个Exception。

根据文档,你可以 打印出一份RDD 如下图所示。由于你处理的是一个流,你可以通过RDDs迭代。我已经用Spark 2.4.5版本成功测试了下面的代码。

文档中的 textFileStream 说,它 "监控一个与Hadoop兼容的文件系统,以获取 新文件 并将其作为文本文件读取",所以请确保您添加modify您要读取的文件。 作业正在运行。

另外,虽然我对Windows上的Spark并不完全熟悉,但你可能需要将目录字符串改为

file://D:\\My Documents\\Desktop\\inbound\\sse\\ssd

下面是Spark Streaming的完整代码示例。

import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Main extends App {
  val sc=new SparkContext("local[1]","Stream")

  val stream=new StreamingContext(sc,Seconds(2))

  val rdd1 =stream.textFileStream("file:///path/to/src/main/resources")

  val mp1= rdd1.flatMap(_.split(" "))

  mp1.foreachRDD(rdd => rdd.collect().foreach(println(_)))

  stream.start()
  stream.awaitTermination()
}

在Spark 2.4.5版本中 Spark Streaming 已被弃用,我建议大家已经熟悉了一下 Spark Structured Streaming. 这方面的代码是这样的。

// Structured Streaming
  val lines: DataFrame = spark.readStream
    .format("text")
    .option("path", "file://path/to/src/main/resources")
    .option("maxFilesPerTrigger", "1")
    .load()

  val query = lines.writeStream
    .outputMode("append")
    .format("console")
    .start()

  query.awaitTermination()
© www.soinside.com 2019 - 2024. All rights reserved.