冰山模式不合并缺失的列

问题描述 投票:0回答:1

我正在 AWS 粘合作业中使用以下代码创建一个 Iceberg 表:

df.writeTo(f'glue_catalog.{DATABASE_NAME}.{TABLE_NAME}') \
    .using('iceberg') \
    .tableProperty("location", TABLE_LOCATION) \
    .tableProperty("write.spark.accept-any-schema", "true") \
    .tableProperty("format-version", "2") \
    .createOrReplace()

表已创建,我可以在 Glue/LF 中看到它,并且可以在 Athena 中查询它。

我有另一项工作正在尝试使用以下方法更新插入数据:

df_upsert.createOrReplaceTempView("upsert_items")
upsert_query = f"""
MERGE INTO glue_catalog.{DATABASE_NAME}.{TABLE_NAME} target
USING (SELECT * FROM upsert_items) updates
ON {join_condidtion}
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
spark.sql(upsert_query)

GlueJob 失败并显示:

AnalysisException: cannot resolve my_column in MERGE command given columns [updates.col1, updates.col2, ...

当列可能丢失或可能添加列时,如何合并新数据。我认为 Iceberg 会通过为缺失/新列填充 NULL 来处理这个问题,因为我设置了“write.spark.accept-any-schema”= true。谢谢你。

运行 Spark 版本 3.3.0-amzn-1
AWS GlueJob v4
冰山 v1.0.0

pyspark aws-glue iceberg apache-iceberg
1个回答
0
投票

根据文档

https://iceberg.apache.org/docs/latest/spark-writes/#spark-type-to-iceberg-type

目前无法通过

spark.sql("MERGE ...")
实现此操作。 有一个开放的功能请求问题来处理这个问题。

一个“非最佳”解决方案是检测是否在源中找到了某个列,但在目标中尚未找到该列,然后在 MERGE 语句之前执行 和

ALTER TABLE target ADD COLUMN
。 🤷u200d♂️🤷u200d♂️🤷u200d♂️

© www.soinside.com 2019 - 2024. All rights reserved.