Apache Beam使用多个表时要写多少次

问题描述 投票:0回答:1

我正在使用Apache Beam从PubSub中读取消息并将其写入BigQuery。我想要做的就是根据输入中的信息写入多个表。为了减少写入量,我在来自PubSub的输入上使用窗口。

一个小例子:

messages
    .apply(new PubsubMessageToTableRow(options))
    .get(TRANSFORM_OUT)
    .apply(ParDo.of(new CreateKVFromRow())
    .apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L))))
    // group by key
    .apply(GroupByKey.create())
    // Are these two rows what I want?
    .apply(Values.create())
    .apply(Flatten.iterables())
    .apply(BigQueryIO.writeTableRows()
          .withoutValidation()
          .withCreateDisposition(CreateDisposition.CREATE_NEVER)
          .withWriteDisposition(WriteDisposition.WRITE_APPEND)
          .withExtendedErrorInfo()
          .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
          .to((SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>) input -> {
                                        // Simplified for readability
                                        Integer destination = (Integer) input.getValue().get("key");
                                        return new TableDestination(
                                                new TableReference()
                                                        .setProjectId(options.getProjectID())
                                                        .setDatasetId(options.getDatasetID())
                                                        .setTableId(destination + "_Table"),
                                                "Table Destination");
                                    }));

我在文档中找不到任何内容,但是我想知道对每个窗口进行了多少次写操作?如果这些是多个表,那么是否为窗口中的所有元素的每个表写一个?还是每个元素一次,如每个表的元素可能不同?

java google-bigquery google-cloud-dataflow apache-beam
1个回答
0
投票

由于您使用PubSub作为源,因此您的工作似乎是流式作业。因此,默认的插入方法是STREAMING_INSERTS(请参阅docs)。我看不出使用此方法减少写入的任何益处或理由,因为Billig基于数据的大小。顺便说一句,您的示例或多或少没有真正有效地减少写入。

尽管它是一个流作业,但是由于一些版本,FILE_LOADS方法也受支持。如果withMethod设置为FILE_LOADS,则可以在withTriggeringFrequency上定义BigQueryIO。此设置定义加载作业发生的频率。在这里,连接器可以为您处理所有操作,您无需按键或窗口数据分组。将为每个表启动加载作业。

由于似乎需要一些时间才能将数据存储在BigQuery中,所以对您来说完全没问题,我建议使用FILE_LOADS,因为加载是免费的,而不是流插入。定义触发频率时只需记住quotas

© www.soinside.com 2019 - 2024. All rights reserved.