Delta Lake Create Table具有类似的结构

问题描述 投票:0回答:1

我在“ / mnt / events-bronze”位置有一个铜级三角洲湖泊表(events_bronze),数据从kafka传输到该表。现在,我希望能够从此表中流式传输并使用“ foreachBatch”更新到一个银色表(events_silver)中。这可以使用青铜表作为源来实现。但是,在初始运行期间,由于events_silver不存在,我不断收到错误消息,说Delta表不存在,这很明显。那么,如何创建具有与events_bronze相同的结构的events_silver?我找不到DDL来做同样的事情。

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  DeltaTable.forPath(spark, "/mnt/events-silver").as("silver")
    .merge(
      microBatchOutputDF.as("bronze"),
      "silver.id=bronze.id")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}
 events_bronze
      .writeStream
      .trigger(Trigger.ProcessingTime("120 seconds"))
      .format("delta")
      .foreachBatch(upsertToDelta _)
      .outputMode("update")
      .start()

在初始运行期间,问题在于没有为路径“ / mnt / events-silver”定义增量湖表。我不确定如何在首次运行时创建与“ / mnt / events-bronze”相同的结构。

apache-spark databricks spark-structured-streaming delta-lake
1个回答
0
投票

您可以使用spark SQL检查表。首先在spark SQL下运行,它将给出青铜表的表定义:

spark.sql("show create table event_bronze").show

获取DDL之后,只需将位置更改为Silver Table的路径并运行该语句就是spark SQL。

注意:请使用“如果不存在则创建表...”,因为它不会在并发运行中失败。

© www.soinside.com 2019 - 2024. All rights reserved.