我正在 AWS 粘合作业中使用以下代码创建一个 Iceberg 表:
df.writeTo(f'glue_catalog.{DATABASE_NAME}.{TABLE_NAME}') \
.using('iceberg') \
.tableProperty("location", TABLE_LOCATION) \
.tableProperty("write.spark.accept-any-schema", "true") \
.tableProperty("format-version", "2") \
.createOrReplace()
表已创建,我可以在 Glue/LF 中看到它,并且可以在 Athena 中查询它。
我有另一项工作正在尝试使用以下方法更新插入数据:
df_upsert.createOrReplaceTempView("upsert_items")
upsert_query = f"""
MERGE INTO glue_catalog.{DATABASE_NAME}.{TABLE_NAME} target
USING (SELECT * FROM upsert_items) updates
ON {join_condidtion}
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
spark.sql(upsert_query)
GlueJob 失败并显示:
AnalysisException: cannot resolve my_column in MERGE command given columns [updates.col1, updates.col2, ...
当列可能丢失或可能添加列时,如何合并新数据。我认为 Iceberg 会通过为缺失/新列填充 NULL 来处理这个问题,因为我设置了“write.spark.accept-any-schema”= true。谢谢你。
运行 Spark 版本 3.3.0-amzn-1
AWS GlueJob v4
冰山 v1.0.0
根据文档:
https://iceberg.apache.org/docs/latest/spark-writes/#spark-type-to-iceberg-type
目前无法通过
spark.sql("MERGE ...")
实现此操作。
有一个开放的功能请求问题来处理这个问题。
一个“非最佳”解决方案是检测是否在源中找到了某个列,但在目标中尚未找到该列,然后在 MERGE 语句之前执行 和
ALTER TABLE target ADD COLUMN
。 🤷u200d♂️🤷u200d♂️🤷u200d♂️