我有一个增量表,其结构如下:
id | col1 | .... | colN
“....”表示有大量列
存储帐户中每天都会存储一个 JSON。我可以使用卷从 STAC 中读取内容。 JSON 的结构如下,例如:
[
{ "id": 1, "col_name": "col3", value: "update_me" },
{ "id": 3, "col_name": "col6", value: "update_me_too" }
]
像这样的 JSON 数组中最多可能有 100 个条目
必须使用此 JSON 更改增量表。例如,上例中的第一个条目指出,对于 id 1,col3 中的条目必须更改为“update_me”。第二个条目指出,对于 id 3,col6 必须更改为“update_me_to”。
更改应每天在批处理作业中执行。
我正在绞尽脑汁地思考如何才能有效地解决这个问题,但不知道......
我确实尝试迭代 JSON 条目和
.update
Delta 表,但我无法获得正确的语法,而且速度非常慢。
我尝试过以下方法:
delta_table = DeltaTable.forPath(spark, "/FileStore/tables/diliptbl")
for row in json_df.collect():
id = row["id"]
col_name = row["col_name"]
value = row["value"]
if isinstance(value, str):
value = lit(value)
delta_table.update(condition=expr("id = {} and {} is not null".format(id, col_name)), set={col_name: value})
delta_table.toDF().show()
结果:
+---+---------------+---------------+----+
| id| col1| col2|col3|
+---+---------------+---------------+----+
| 3| quux|updated_value_3| 30|
| 2|updated_value_2| qux| 20|
| 1| foo| bar| 100|
+---+---------------+---------------+----+
在上面的代码中,我创建了一个 Delta 表并使用
DeltaTable.forPath()
方法读取它。
创建了一个名为 json_df 的 DataFrame。
迭代 json_df
中的每一行。
对于每一行,提取要更新的 id、col_name 和值。
检查该值是否为字符串,并在必要时将其转换为 lit 表达式,以确保更新操作的类型正确。
为 json_df 中的每一行调用 delta_table 对象的 update 方法。
使用condition参数根据id和col_name指定要更新的行,并使用set参数指定要使用新值更新的列(col_name)。