apache beam WirteToBigQuery 写入优化

问题描述 投票:0回答:1

我有一个流管道,我在其中应用 30 秒的窗口并分组在 (“customerId”+“siteId”,element) 元组中。 如何应用基于companyId+siteId的动态表名称,围绕这一行?: f'{GCP_PROJECT}.{customerId}.{siteId}'

有没有办法获取第一个元组,该元组具有指定表名所需的两个变量?如何才能有效地做到这一点?

bq_results = (
            pipeline_bq
            | 'Add Timestamp' >> beam.Map(lambda elem: beam.window.TimestampedValue(elem, int(elem['measurement_time'])))
            | 'Window into window_size Seconds' >> beam.WindowInto(beam.window.FixedWindows(window_size), allowed_lateness=beam.window.Duration(seconds=30))
            | 'creating tupple' >> beam.Map(lambda element: (element.get("companyId", ""), element.get("siteId", ""), element))
            | 'Eliminating the key value pair' >> beam.Map(lambda kv_pair: [element for element in kv_pair[1]])
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                table_fn,
                schema=schema,
                method="STREAMING_INSERTS",
google-bigquery google-cloud-dataflow apache-beam-io
1个回答
0
投票

我收到以下错误(每 2-5m 一次),窗口时间为 120 秒 (2m):

BigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)-ptransform-59, state=process-msecs} for at least 26399.08 seconds without outputting or completing.

使用以下 WriteToBigQuery 方法:

| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                table=lambda element: f'{GCP_PROJECT}:test_dataset.{element.get("siteId")}',
                schema=schema,
                method="STREAMING_INSERTS",
                triggering_frequency=120,
                with_auto_sharding=True,
                batch_size=200,

有什么提高 WriteToBigQuery 性能的建议吗?

© www.soinside.com 2019 - 2024. All rights reserved.