如何替换 BigQuery Apache Beam Java 数据流中的现有行

问题描述 投票:0回答:1

目前,我有一个名为

cdn_daily_user_playback_requests_1MONTH
的 BigQuery 表。其中包含基于日常记录的大量数据。因此,会有 2023-07、2023-08 等整个月份的数据。现在,举例来说,我想创建 2023-07 年的新数据并将其写入 BigQuery 表,并且该表已经有记录从 2023-07 月开始,我该如何在 Java 中的 Apache Beam 代码中执行此操作(将表中的当前数据替换为我拥有的新数据)?

我的管道代码在这里:

pipeline
            .apply("Read from cdn_requests BigQuery", BigQueryIO
                    .read(new CdnMediaRequestLogEntity.FromSchemaAndRecord())
                    .fromQuery(cdnRequestsQueryString)
                    .usingStandardSql())
            .apply("Validate and Filter Cdn Media Request Log Objects", Filter.by(new CdnMediaRequestValidator()))
            .apply("Convert Cdn Logs To Key Value Pairs", ParDo.of(new CdnMediaRequestResponseSizeKeyValuePairConverter()))
            .apply("Sum the Response Sizes By Key", Sum.longsPerKey())
            .apply("Convert To New Daily Requests Objects", ParDo.of(new CdnDailyRequestConverter(projectId, kind)))
            .apply("Convert Cdn Media Request Entities to Big Query Objects", ParDo.of(new BigQueryCdnDailyRequestRowConverter()))
            .apply("Write Data To BigQuery", BigQueryIO.writeTableRows()
                .to(writeCdnMediaRequestTable)
                .withSchema(cdnDailyRequestSchema)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

我确实尝试并测试了

BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE
函数,但根据我的理解,这会删除该表中的所有数据,然后将新创建的数据写入其中。但我只想删除 2023-07 月份的数据,而不是所有数据。

编辑(更新): 因此,我找到了一个解决方案,它通过创建一个 SerializedFunction 来工作,该函数在将分区键写入 BigQuery 时将分区键作为标识符(我的表在具有日期数据类型的

Date
列上进行分区)。所以发生的情况是它只按分区列取出表的部分内容。

这是我的解决方案示例代码:

public class BigQueryDayPartitionDestinations implements SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination> {

    private final String projectId;
    private final String datasetId;
    private final String pattern;
    private final String table;

    public static BigQueryDayPartitionDestinations writePartitionsPerDay(String projectId, String datasetId, String tablePrefix) {
        return new BigQueryDayPartitionDestinations(projectId, datasetId, "yyyyMMdd", tablePrefix + "$");
    }

    private BigQueryDayPartitionDestinations(String projectId, String datasetId, String pattern, String table) {
        this.projectId = projectId;
        this.datasetId = datasetId;
        this.pattern = pattern;
        this.table = table;
    }

    @Override
    public TableDestination apply(ValueInSingleWindow<TableRow> input) {
        DateTimeFormatter partition = DateTimeFormat.forPattern(pattern).withZone(DateTimeZone.forID("Asia/Tokyo"));
        DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZone(DateTimeZone.forID("Asia/Tokyo"));

        TableReference reference = new TableReference();
        reference.setProjectId(this.projectId);
        reference.setDatasetId(this.datasetId);

        var date = input.getValue().get("Date").toString();
        DateTime dateTime = formatter.parseDateTime(date);

        var tableId = table + dateTime.toInstant().toString(partition);

        reference.setTableId(tableId);
        return new TableDestination(reference, null, new TimePartitioning().setType("DAY").setField("Date"));
   }
}
java google-bigquery google-cloud-dataflow apache-beam
1个回答
0
投票

解决方案: 因此,我找到了一个解决方案,它通过创建一个 SerializedFunction 来工作,该函数在将分区键写入 BigQuery 时将分区键作为标识符(我的表在具有日期数据类型的

Date
列上进行分区)。所以发生的情况是它只按分区列取出表的部分内容。

这是我的解决方案示例代码:

public class BigQueryDayPartitionDestinations implements SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination> {

    private final String projectId;
    private final String datasetId;
    private final String pattern;
    private final String table;

    public static BigQueryDayPartitionDestinations writePartitionsPerDay(String projectId, String datasetId, String tablePrefix) {
        return new BigQueryDayPartitionDestinations(projectId, datasetId, "yyyyMMdd", tablePrefix + "$");
    }

    private BigQueryDayPartitionDestinations(String projectId, String datasetId, String pattern, String table) {
        this.projectId = projectId;
        this.datasetId = datasetId;
        this.pattern = pattern;
        this.table = table;
    }

    @Override
    public TableDestination apply(ValueInSingleWindow<TableRow> input) {
        DateTimeFormatter partition = DateTimeFormat.forPattern(pattern).withZone(DateTimeZone.forID("Asia/Tokyo"));
        DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZone(DateTimeZone.forID("Asia/Tokyo"));

        TableReference reference = new TableReference();
        reference.setProjectId(this.projectId);
        reference.setDatasetId(this.datasetId);

        var date = input.getValue().get("Date").toString();
        DateTime dateTime = formatter.parseDateTime(date);

        var tableId = table + dateTime.toInstant().toString(partition);

        reference.setTableId(tableId);
        return new TableDestination(reference, null, new TimePartitioning().setType("DAY").setField("Date"));
   }
}
© www.soinside.com 2019 - 2024. All rights reserved.