下面是我在spark结构化流媒体中的foreachBatch代码。
df.writeStream.trigger(Trigger.ProcessingTime("10 seconds")).foreachBatch((batchDF: DataFrame, batchId: Long) => {
batchDF.persist
batchDF.createOrReplaceTempView("all_notifis");
batchDF.write.mode(SaveMode.Append).saveAsTable("api_notifications_topics");
val meta_data= spark.sql("select topic,partition,max(msg_timestamp) as msg_ts ,max(off_set) as max_offset from all_notifis group by topic,partition")
meta_data.write.mode(SaveMode.Append).saveAsTable("api_notifics_metadata");
batchDF.unpersist()
}).start().awaitTermination()
尽管我创建了tempview("all_notifis"),但它试图从hive默认DB中获取该表,并抛出以下错误。
原因是:org.apache.spark.sql.catalyst.analysis.NoSuchTableException。表或视图'all_notifis'在数据库'default'中找不到。
有谁能帮我解决这个问题吗?
LocalTempView是指会话范围内的本地临时视图。它的寿命是创建它的会话的寿命,即当会话终止时,它将自动放弃。它不与任何数据库绑定,也就是说,我们不能使用
db1.view1
来引用一个本地临时视图。
可以尝试是否可以通过 batchDF.all_notifis
或 db1.all_notifis
如果它不工作,然后替换您的视图创建使用。
batchDF.createOrReplaceTempView("all_notifis");
并使用 global_temp.all_notifis