我正在处理的数据示例:
来源
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type |store_status | name | owner |owner_code |store_asOfDate |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
| 123 |type |not_active |name |xyz | xyz | 2024-03-15|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
目标
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type |store_status | name | owner |owner_code |store_asOfDate |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
| 123 |type |active |name |xyz | xyz | 2024-03-20|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
代码片段
target_dt.alias("target") \
.merge(
source=df_trusted.alias("source"),
condition="target.store_id=source.store_id AND target.store_status=source.store_status"
) \
.whenNotMatchedBySourceUpdate(
set={
"store_status": F.col("source.store_status"),
"store_asOfDate": F.col("source.store_asOfDate")
}
) \
.execute()
预期行为:
store_status
和 store_asOfDate
已更新。目前,抛出错误:
24/03/21 14:06:29 ERROR Error occured in my_method() method: [DELTA_MERGE_UNRESOLVED_EXPRESSION] Cannot resolve source.store_id in UPDATE condition given columns....
请建议我可以在哪里进一步调试根本原因。 预先感谢!
我认为你的目标是在以下时间结束时有一个 DeltaTable:
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type |store_status | name | owner |owner_code |store_asOfDate |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
| 123 |type |not_active |name |xyz | xyz | 2024-03-15|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
问题是你要尝试通过合并语句找到匹配的行。
让我将您的代码编写得更高级一些,这样您就可以在合并语句中设置更多标识列。
event
别名是已经存在的表,因为它包含事件记录。 updates
别名是应包含要更新的行的表。
from pyspark.sql import functions as f
merge_keys = ["store_id"]
cols_to_update = ["store_status", "store_asOfDate"]
(
target_dt.alias("events")
.merge(
source=df_trusted.alias("updates"),
condition=" AND ".join([f"events.`{x}` = updates.`{x}`" for x in merge_keys])
)
.whenMatchedUpdate(set={x: f"updates.`{x}`" for x in cols_to_update})
.execute()
)
另外,请不要使用
\
类型的行分隔符编写 Python 代码,因为它们已被弃用。而是使用括号。