主要对象
一个Python流管道,我在其中读取来自pub/sub的输入。
分析输入后,有两个选项可用:
测试
问题
插入的记录仍在BigQuery缓冲区中,因此更新语句失败:
UPDATE or DELETE statement over table table would affect rows in the streaming buffer, which is not supported
代码
插入
def insertCanonicalBQ(input):
from google.cloud import bigquery
client = bigquery.Client(project='project')
dataset = client.dataset('dataset')
table = dataset.table('table' )
table.reload()
table.insert_data(
rows=[[values]])
更新
def UpdateBQ(input):
from google.cloud import bigquery
import uuid
import time
client = bigquery.Client()
STD= "#standardSQL"
QUERY= STD + "\n" + """UPDATE table SET field1 = 'XXX' WHERE field2= 'YYY'"""
client.use_legacy_sql = False
query_job = client.run_async_query(query=QUERY, job_name='temp-query-job_{}'.format(uuid.uuid4())) # API request
query_job.begin()
while True:
query_job.reload() # Refreshes the state via a GET request.
if query_job.state == 'DONE':
if query_job.error_result:
raise RuntimeError(query_job.errors)
print "done"
return input
time.sleep(1)
即使该行不在流缓冲区中,这仍然不是在 BigQuery 中解决此问题的方法。 BigQuery 存储更适合批量突变,而不是像这样通过
UPDATE
突变单个实体。您的模式与我对事务性而非分析性用例的期望是一致的。
为此考虑基于附加的模式。每次处理实体消息时,都会通过流式插入将其写入 BigQuery。然后,在需要时,您可以通过查询获取所有实体的最新版本。
作为示例,让我们假设一个任意模式:
idfield
是您的唯一实体键/标识符,message_time
表示消息发出的时间。您的实体可能有许多其他字段。要获取实体的最新版本,我们可以运行以下查询(并可能将其写入另一个表):
#standardSQL
SELECT
idfield,
ARRAY_AGG(
t ORDER BY message_time DESC LIMIT 1
)[OFFSET(0)].* EXCEPT (idfield)
FROM `myproject.mydata.mytable` AS t
GROUP BY idfield
这种方法的另一个优点是它还允许您在任意时间点执行分析。要对实体一小时前的状态进行分析,只需添加一个 WHERE 子句:
WHERE message_time <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
我很高兴地宣布,公共预览版现在支持通过 BigQuery Storage Write API* 对最近的流数据进行变异 DML 语句(UPDATE、DELETE、MERGE)!在此处查看该功能以及如何将您的项目列入白名单:https://cloud.google.com/bigquery/docs/write-api#use_data_manipulation_language_dml_with_recently_streamed_data。
*此功能仅支持最近通过 BigQuery Storage Write API 传输的数据,而不支持旧版 insertAll 流式传输 API。