Google Dataflow:在流式管道中的 BigQuery 中插入 + 更新

问题描述 投票:0回答:2

主要对象

一个Python流管道,我在其中读取来自pub/sub的输入。

分析输入后,有两个选项可用:

  • 如果 x=1 -> 插入
  • 如果 x=2 -> 更新

测试

  • 使用apache beam功能无法完成此操作,因此需要使用BigQuery的0.25 API进行开发(目前这是Google Dataflow支持的版本)

问题

  • 插入的记录仍在BigQuery缓冲区中,因此更新语句失败:

         UPDATE or DELETE statement over table table would affect rows in the streaming buffer, which is not supported
    

代码

插入

def insertCanonicalBQ(input):
    from google.cloud import bigquery
    client = bigquery.Client(project='project')
    dataset = client.dataset('dataset')
    table = dataset.table('table' )
    table.reload()
    table.insert_data(
        rows=[[values]])

更新

def UpdateBQ(input):
    from google.cloud import bigquery
    import uuid
    import time
    client = bigquery.Client()
    STD= "#standardSQL"
    QUERY= STD + "\n" + """UPDATE table SET field1 = 'XXX' WHERE field2=  'YYY'"""
    client.use_legacy_sql = False    
    query_job = client.run_async_query(query=QUERY, job_name='temp-query-job_{}'.format(uuid.uuid4()))  # API request
    query_job.begin()
    while True:
         query_job.reload()  # Refreshes the state via a GET request.
         if query_job.state == 'DONE':
             if query_job.error_result:
                 raise RuntimeError(query_job.errors)
             print "done"
             return input
             time.sleep(1)
python google-bigquery google-cloud-dataflow apache-beam
2个回答
3
投票

即使该行不在流缓冲区中,这仍然不是在 BigQuery 中解决此问题的方法。 BigQuery 存储更适合批量突变,而不是像这样通过

UPDATE
突变单个实体。您的模式与我对事务性而非分析性用例的期望是一致的。

为此考虑基于附加的模式。每次处理实体消息时,都会通过流式插入将其写入 BigQuery。然后,在需要时,您可以通过查询获取所有实体的最新版本。

作为示例,让我们假设一个任意模式:

idfield
是您的唯一实体键/标识符,
message_time
表示消息发出的时间。您的实体可能有许多其他字段。要获取实体的最新版本,我们可以运行以下查询(并可能将其写入另一个表):

#standardSQL
SELECT
  idfield,
  ARRAY_AGG(
    t ORDER BY message_time DESC LIMIT 1
  )[OFFSET(0)].* EXCEPT (idfield)
FROM `myproject.mydata.mytable` AS t
GROUP BY idfield

这种方法的另一个优点是它还允许您在任意时间点执行分析。要对实体一小时前的状态进行分析,只需添加一个 WHERE 子句:

WHERE message_time <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)


0
投票

我很高兴地宣布,公共预览版现在支持通过 BigQuery Storage Write API* 对最近的流数据进行变异 DML 语句(UPDATE、DELETE、MERGE)!在此处查看该功能以及如何将您的项目列入白名单:https://cloud.google.com/bigquery/docs/write-api#use_data_manipulation_language_dml_with_recently_streamed_data

*此功能仅支持最近通过 BigQuery Storage Write API 传输的数据,而不支持旧版 insertAll 流式传输 API。

© www.soinside.com 2019 - 2024. All rights reserved.