我有一个流管道,我在其中应用 30 秒的窗口并分组在 (“customerId”+“siteId”,element) 元组中。 如何应用基于companyId+siteId的动态表名称,围绕这一行?: f'{GCP_PROJECT}.{customerId}.{siteId}'
有没有办法获取第一个元组,该元组具有指定表名所需的两个变量?如何才能有效地做到这一点?
bq_results = (
pipeline_bq
| 'Add Timestamp' >> beam.Map(lambda elem: beam.window.TimestampedValue(elem, int(elem['measurement_time'])))
| 'Window into window_size Seconds' >> beam.WindowInto(beam.window.FixedWindows(window_size), allowed_lateness=beam.window.Duration(seconds=30))
| 'creating tupple' >> beam.Map(lambda element: (element.get("companyId", ""), element.get("siteId", ""), element))
| 'Eliminating the key value pair' >> beam.Map(lambda kv_pair: [element for element in kv_pair[1]])
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
table_fn,
schema=schema,
method="STREAMING_INSERTS",
我收到以下错误(每 2-5m 一次),窗口时间为 120 秒 (2m):
BigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)-ptransform-59, state=process-msecs} for at least 26399.08 seconds without outputting or completing.
使用以下 WriteToBigQuery 方法:
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
table=lambda element: f'{GCP_PROJECT}:test_dataset.{element.get("siteId")}',
schema=schema,
method="STREAMING_INSERTS",
triggering_frequency=120,
with_auto_sharding=True,
batch_size=200,
有什么提高 WriteToBigQuery 性能的建议吗?