我正在使用Apache Beam
从PubSub中读取消息并将其写入BigQuery。我想要做的就是根据输入中的信息写入多个表。为了减少写入量,我在来自PubSub的输入上使用窗口。
一个小例子:
messages
.apply(new PubsubMessageToTableRow(options))
.get(TRANSFORM_OUT)
.apply(ParDo.of(new CreateKVFromRow())
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L))))
// group by key
.apply(GroupByKey.create())
// Are these two rows what I want?
.apply(Values.create())
.apply(Flatten.iterables())
.apply(BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to((SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>) input -> {
// Simplified for readability
Integer destination = (Integer) input.getValue().get("key");
return new TableDestination(
new TableReference()
.setProjectId(options.getProjectID())
.setDatasetId(options.getDatasetID())
.setTableId(destination + "_Table"),
"Table Destination");
}));
我在文档中找不到任何内容,但是我想知道对每个窗口进行了多少次写操作?如果这些是多个表,那么是否为窗口中的所有元素的每个表写一个?还是每个元素一次,如每个表的元素可能不同?
由于您使用PubSub作为源,因此您的工作似乎是流式作业。因此,默认的插入方法是STREAMING_INSERTS
(请参阅docs)。我看不出使用此方法减少写入的任何益处或理由,因为Billig基于数据的大小。顺便说一句,您的示例或多或少没有真正有效地减少写入。
尽管它是一个流作业,但是由于一些版本,FILE_LOADS
方法也受支持。如果withMethod
设置为FILE_LOADS
,则可以在withTriggeringFrequency
上定义BigQueryIO
。此设置定义加载作业发生的频率。在这里,连接器可以为您处理所有操作,您无需按键或窗口数据分组。将为每个表启动加载作业。
由于似乎需要一些时间才能将数据存储在BigQuery中,所以对您来说完全没问题,我建议使用FILE_LOADS
,因为加载是免费的,而不是流插入。定义触发频率时只需记住quotas。