如何一起使用SparkSession和StreamingContext?

问题描述 投票:1回答:1

我正在尝试从本地计算机(OSX)上的文件夹中流式传输CSV文件。我像这样一起使用SparkSession和StreamingContext:

val sc: SparkContext = createSparkContext(sparkContextName)
val sparkSess = SparkSession.builder().config(sc.getConf).getOrCreate()
val ssc = new StreamingContext(sparkSess.sparkContext, Seconds(time))

val csvSchema = new StructType().add("field_name",StringType)
val inputDF = sparkSess.readStream.format("org.apache.spark.csv").schema(csvSchema).csv("file:///Users/userName/Documents/Notes/MoreNotes/tmpFolder/")

如果我在此之后运行ssc.start(),我会收到此错误:

java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute

相反,如果我尝试像这样启动SparkSession

inputDF.writeStream.format("console").start()

我明白了:

java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.

很明显,我不明白SparkSessionStreamingContext应该如何合作。如果我摆脱SparkSessionStreamingContext只有textFileStream,我需要在其上施加CSV模式。非常感谢有关如何使其工作的任何说明。

scala apache-spark spark-dataframe spark-streaming
1个回答
3
投票

你不能一起拥有一个火花会话和火花环境。随着Spark 2.0.0的发布,开发人员可以使用一种新的抽象--Spark会话 - 可以实例化并调用,就像以前可用的Spark Context一样。

您仍然可以从spark会话构建器访问spark上下文:

 val sparkSess = SparkSession.builder().appName("My App").getOrCreate()
 val sc = sparkSess.sparkContext
 val ssc = new StreamingContext(sc, Seconds(time))

导致工作失败的另一个原因是您正在执行转换并且不会调用任何操作。最后应该调用一些动作,例如inputDF.show()

© www.soinside.com 2019 - 2024. All rights reserved.