Delta 合并无法识别现有列

问题描述 投票:0回答:1

我正在处理的数据示例:

来源

+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type                |store_status |        name        |    owner           |owner_code    |store_asOfDate    |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|  123    |type                |not_active   |name                |xyz                 |    xyz       |        2024-03-15|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+

目标

+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type                |store_status |        name        |    owner           |owner_code    |store_asOfDate    |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|  123    |type                |active       |name                |xyz                 |    xyz       |        2024-03-20|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+

代码片段

target_dt.alias("target") \
    .merge(
        source=df_trusted.alias("source"),
        condition="target.store_id=source.store_id AND target.store_status=source.store_status"
    ) \
    .whenNotMatchedBySourceUpdate(
        set={
            "store_status": F.col("source.store_status"),
            "store_asOfDate": F.col("source.store_asOfDate")
        }
    ) \
    .execute()

预期行为

  • 目标行
    store_status
    store_asOfDate
    已更新。

目前,抛出错误:

24/03/21 14:06:29 ERROR Error occured in my_method() method: [DELTA_MERGE_UNRESOLVED_EXPRESSION] Cannot resolve source.store_id in UPDATE condition given columns....

请建议我可以在哪里进一步调试根本原因。 预先感谢!

python pyspark delta-lake
1个回答
0
投票

我认为你的目标是在以下时间结束时有一个 DeltaTable:

+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type                |store_status |        name        |    owner           |owner_code    |store_asOfDate    |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|  123    |type                |not_active   |name                |xyz                 |    xyz       |        2024-03-15|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+

问题是你要尝试通过合并语句找到匹配的行。

让我将您的代码编写得更高级一些,这样您就可以在合并语句中设置更多标识列。

event
别名是已经存在的表,因为它包含事件记录。
updates
别名是应包含要更新的行的表。

from pyspark.sql import functions as f

merge_keys = ["store_id"]
cols_to_update = ["store_status", "store_asOfDate"]
(
    target_dt.alias("events")
    .merge(
        source=df_trusted.alias("updates"),
        condition=" AND ".join([f"events.`{x}` = updates.`{x}`" for x in merge_keys])
    )
    .whenMatchedUpdate(set={x: f"updates.`{x}`" for x in cols_to_update})
    .execute()
)

另外,请不要使用

\
类型的行分隔符编写 Python 代码,因为它们已被弃用。而是使用括号。

© www.soinside.com 2019 - 2024. All rights reserved.