如何使用Python中的参数化sql查询从包含结构数组的列表中更新bigquery中的行

问题描述 投票:0回答:1

我在谷歌云中有一个云功能,它使用流插入将有关图像的数据加载到bigquery中。每个图像都分为不同的部分,因此我在表格中为图像的每个部分都有一行。我想稍后使用单个 sql 查询添加一些附加信息来更新这些行,但我的表包含结构数组。我目前正在 python 中生成更新的信息,并将每一行保存在字典中,并创建一个包含我想要更新的所有字典的列表。现在,我想使用此行更新列表通过参数化 SQL 查询来更新表中的相关行。数据如下:

row_updates = [{'userID': 'user1', 'timestamp': 1678755601, 'x': 89.01, 'y': 101.01, 'z': 6, 'section': 1, 'eyeDetection': [{'eyeDetected': 'true', 'dateComputed': 1678755100}]}, {'userID': 'user1', 'timestamp': 1678755601, 'x': 109.01, 'y': 102.01, 'z': 6, 'section': 2, 'eyeDetection': [{'eyeDetected': 'true', 'dateComputed': 1678755100}]}]

我的表根据时间戳进行分区,根据用户 ID 进行集群,并包含一些字段和具有以下架构的结构数组: 字段名 类型 模式 需要 1 级用户 ID 字符串
1 级时间戳 TIMESTAMP REQUIRED
级别 1 x FLOAT NULLABLE
级别 1 y FLOAT NULLABLE
级别 1 z FLOAT NULLABLE
1 级 stack_num INTEGER NULLABLE
1 级眼睛检测记录重复
2 级 eyeDetected STRING NULLABLE
2 级日期计算的 INTEGER NULLABLE

我不完全理解如何正确设置 query_parameters 来发送我的特定数据或使用 sql 处理它。我已阅读此处的文档https://cloud.google.com/bigquery/docs/parameterized-queries并查看了相关的客户端库类文档https://cloud.google.com/python/docs/reference /bigquery/latest/google.cloud.bigquery.query.ArrayQueryParameter 但我无法让它使用我的数据。我的目标是使用 ArrayQueryParameter 加载我的列表,然后取消嵌套并在 sql 查询中将其全部匹配,但我不确定如何做到这一点或是否可能。 这是我的代码:

from google.cloud import bigquery


client = bigquery.Client()

row_updates = [{'userID': 'user1', 'timestamp': 1678755601, 'x': 89.01, 'y': 101.01, 'z': 6, 'section': 1, 'eyeDetection': [{'eyeDetected': 'true', 'dateComputed': 1678755100}]}, {'userID': 'user1', 'timestamp': 1678755601, 'x': 109.01, 'y': 102.01, 'z': 6, 'section': 2, 'eyeDetection': [{'eyeDetected': 'true', 'dateComputed': 1678755100}]}]

update_query = f"""
    UPDATE `{dataset_name}.{table_name}` main
    SET
        main.x = TEMP.x,
        main.y = TEMP.y,
        main.z = TEMP.z,
        main.eyeDetection = TEMP.eyeDetection
    FROM UNNEST(@row_updates) as TEMP
    WHERE main.userID = TEMP.userID AND
    main.timestamp = TEMP.timestamp AND
    main.stack_num = TEMP.stack_num;
"""

query_params = [
    bigquery.ArrayQueryParameter(
        "row_updates",
        "STRUCT<userID STRING, timestamp TIMESTAMP, x FLOAT64, y FLOAT64, z FLOAT64, stack_num INT64, eyeDetection ARRAY<STRUCT<eyeDetected STRING, dateComputed INT64>>>",
        row_updates
    )
]

# Execute the parameterized update query
job_config = bigquery.QueryJobConfig(query_parameters=query_params)
client.query(update_query, job_config=job_config).result()

当我运行这个时,我得到

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Users\dsuit\Documents\files\.virtualenvs\repo-env2\lib\site-packages\google\cloud\bigquery\client.py", line 3334, in query
    return _job_helpers.query_jobs_insert(
  File "C:\Users\dsuit\Documents\files\.virtualenvs\repo-env2\lib\site-packages\google\cloud\bigquery\_job_helpers.py", line 114, in query_jobs_insert
    future = do_query()
  File "C:\Users\dsuit\Documents\files\.virtualenvs\repo-env2\lib\site-packages\google\cloud\bigquery\_job_helpers.py", line 91, in do_query
    query_job._begin(retry=retry, timeout=timeout)
  File "C:\Users\dsuit\Documents\files\.virtualenvs\repo-env2\lib\site-packages\google\cloud\bigquery\job\query.py", line 1298, in _begin
    super(QueryJob, self)._begin(client=client, retry=retry, timeout=timeout)
  File "C:\Users\dsuit\Documents\files\.virtualenvs\repo-env2\lib\site-packages\google\cloud\bigquery\job\base.py", line 510, in _begin
    api_response = client._call_api(
  File "C:\Users\dsuit\Documents\files\.virtualenvs\repo-env2\lib\site-packages\google\cloud\bigquery\client.py", line 759, in _call_api
    return call()
    return retry_target(
  File "C:\Users\dsuit\Documents\files\.virtualenvs\repo-env2\lib\site-packages\google\api_core\retry.py", line 190, in retry_target
    return target()
  File "C:\Users\dsuit\Documents\files\.virtualenvs\repo-env2\lib\site-packages\google\cloud\_http\__init__.py", line 494, in api_request
google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/my-project-test/jobs?prettyPrint=false: Invalid value for type: STRUCT<userID STRING, timestamp TIMESTAMP, x FLOAT64, y FLOAT64, z FLOAT64, stack_num INT64, eyeDetection ARRAY<STRUCT<eyeDetected STRING, dateComputed INT64>>> is not a valid value

如果我修改 query_params 以使用 Struct 和 ScalarQueryParameters,则需要我仅包含单行的值。 对其他解决方案开放,只要它们允许列表中的所有更新在单个查询中发生即可。如果可以避免的话,我不想加载到临时表中并从那里合并。

google-bigquery
1个回答
0
投票

我最近决定尝试重新审视这个问题,因为我对 bigquery 有更多的经验并找到了解决方案。我重组了表以使用具有多个字段的单个 NULLABLE RECORD 类型,而不是通过 REPEATABLE RECORD 类型使用 STRUCTS 数组。这样我就可以使用列表中的数据更新整个 eyeDetected 字段。然后,我使用基于列表中的数据动态生成的查询字符串,并将其提供给 MERGE DML 命令以立即更新所有行。这就是上面示例的样子:

MERGE INTO {table_name} AS target
USING (SELECT * FROM UNNEST([{row_updates}]) ) AS source
ON target.userID = source.userID AND target.timestamp = 
source.timestamp AND target.stack_num = source.stack_num
WHEN MATCHED THEN
UPDATE SET eyeDetection = source.eyeDetection;

此方法的最大问题是您无法使用 DML 语句对当前位于流缓冲区中的行进行更新。除非您想使用临时表/临时表,否则任何更新都必须等待 90 分钟以上才能尝试。

© www.soinside.com 2019 - 2024. All rights reserved.