每天使用“类似更改源”的 JSON 更新 Delta 表

问题描述 投票:0回答:1

我有一个增量表,其结构如下:

id | col1 | .... | colN

“....”表示有大量列

存储帐户中每天都会存储一个 JSON。我可以使用卷从 STAC 中读取内容。 JSON 的结构如下,例如:

[
  { "id": 1, "col_name": "col3", value: "update_me" },
  { "id": 3, "col_name": "col6", value: "update_me_too" }
]

像这样的 JSON 数组中最多可能有 100 个条目

必须使用此 JSON 更改增量表。例如,上例中的第一个条目指出,对于 id 1,col3 中的条目必须更改为“update_me”。第二个条目指出,对于 id 3,col6 必须更改为“update_me_to”。

更改应每天在批处理作业中执行。

我正在绞尽脑汁地思考如何才能有效地解决这个问题,但不知道......

我确实尝试迭代 JSON 条目和

.update
Delta 表,但我无法获得正确的语法,而且速度非常慢。

pyspark databricks azure-databricks
1个回答
0
投票

我尝试过以下方法:

delta_table = DeltaTable.forPath(spark, "/FileStore/tables/diliptbl")
for row in json_df.collect():
    id = row["id"]
    col_name = row["col_name"]
    value = row["value"]
    
    if isinstance(value, str):
        value = lit(value)
    
    delta_table.update(condition=expr("id = {} and {} is not null".format(id, col_name)), set={col_name: value})
delta_table.toDF().show()

结果:

+---+---------------+---------------+----+
| id|           col1|           col2|col3|
+---+---------------+---------------+----+
|  3|           quux|updated_value_3|  30|
|  2|updated_value_2|            qux|  20|
|  1|            foo|            bar| 100|
+---+---------------+---------------+----+

在上面的代码中,我创建了一个 Delta 表并使用

DeltaTable.forPath()
方法读取它。 创建了一个名为 json_df 的 DataFrame。 迭代
json_df
中的每一行。 对于每一行,提取要更新的 id、col_name 和值。 检查该值是否为字符串,并在必要时将其转换为 lit 表达式,以确保更新操作的类型正确。 为 json_df 中的每一行调用 delta_table 对象的 update 方法。 使用condition参数根据id和col_name指定要更新的行,并使用set参数指定要使用新值更新的列(col_name)。

© www.soinside.com 2019 - 2024. All rights reserved.