我在“ / mnt / events-bronze”位置有一个铜级三角洲湖泊表(events_bronze),数据从kafka传输到该表。现在,我希望能够从此表中流式传输并使用“ foreachBatch”更新到一个银色表(events_silver)中。这可以使用青铜表作为源来实现。但是,在初始运行期间,由于events_silver不存在,我不断收到错误消息,说Delta表不存在,这很明显。那么,如何创建具有与events_bronze相同的结构的events_silver?我找不到DDL来做同样的事情。
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
DeltaTable.forPath(spark, "/mnt/events-silver").as("silver")
.merge(
microBatchOutputDF.as("bronze"),
"silver.id=bronze.id")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
events_bronze
.writeStream
.trigger(Trigger.ProcessingTime("120 seconds"))
.format("delta")
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
在初始运行期间,问题在于没有为路径“ / mnt / events-silver”定义增量湖表。我不确定如何在首次运行时创建与“ / mnt / events-bronze”相同的结构。
您可以使用spark SQL检查表。首先在spark SQL下运行,它将给出青铜表的表定义:
spark.sql("show create table event_bronze").show
获取DDL之后,只需将位置更改为Silver Table的路径并运行该语句就是spark SQL。
注意:请使用“如果不存在则创建表...”,因为它不会在并发运行中失败。