在Spark结构化流中对createOrReplaceTempView进行查询时出现错误。

问题描述 投票:0回答:1

下面是我在spark结构化流媒体中的foreachBatch代码。

     df.writeStream.trigger(Trigger.ProcessingTime("10 seconds")).foreachBatch((batchDF: DataFrame, batchId: Long) => {

     batchDF.persist

     batchDF.createOrReplaceTempView("all_notifis");

      batchDF.write.mode(SaveMode.Append).saveAsTable("api_notifications_topics");

       val meta_data= spark.sql("select topic,partition,max(msg_timestamp) as msg_ts ,max(off_set) as max_offset from all_notifis  group by topic,partition")

      meta_data.write.mode(SaveMode.Append).saveAsTable("api_notifics_metadata");

      batchDF.unpersist()

     }).start().awaitTermination()

尽管我创建了tempview("all_notifis"),但它试图从hive默认DB中获取该表,并抛出以下错误。

原因是:org.apache.spark.sql.catalyst.analysis.NoSuchTableException。表或视图'all_notifis'在数据库'default'中找不到。

有谁能帮我解决这个问题吗?

spark-structured-streaming
1个回答
0
投票

LocalTempView是指会话范围内的本地临时视图。它的寿命是创建它的会话的寿命,即当会话终止时,它将自动放弃。它不与任何数据库绑定,也就是说,我们不能使用 db1.view1 来引用一个本地临时视图。

可以尝试是否可以通过 batchDF.all_notifisdb1.all_notifis

如果它不工作,然后替换您的视图创建使用。

     batchDF.createOrReplaceTempView("all_notifis");

并使用 global_temp.all_notifis

© www.soinside.com 2019 - 2024. All rights reserved.