TextIO.read()。watchForNewFiles()阻止写入BigQuery

问题描述 投票:0回答:2

我正在尝试创建一个管道,等待GCS文件夹中的新csv文件来处理它们并将输出写入BigQuery。

我写了以下代码:

public static void main(String[] args) {

    Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class));
    TableReference tableRef = new TableReference();
    tableRef.setProjectId(PROJECT_ID);
    tableRef.setDatasetId(DATASET_ID);
    tableRef.setTableId(TABLE_ID);
    //Pipeline p = Pipeline.create(PipelineOptionsFactory.as(Options.class));

    // Read files as they arrive in GS
    p.apply("ReadFile", TextIO.read()
        .from("gs://mybucket/*.csv")
        .watchForNewFiles(
            // Check for new files every 30 seconds
            Duration.standardSeconds(30),
            // Never stop checking for new files
            Watch.Growth.<String>never()
        )
    )
    .apply(ParDo.of(new DoFn<String, Segment>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String[] items = c.element().split(",");

            if (items[0].startsWith("_", 1)) {
                // Skip header (the header is starting with _comment)
                LOG.info("Skipped header");
                return;
            }

            Segment segment = new Segment(items);
            c.output(segment);
        }
    }))
    .apply(ParDo.of(new FormatSegment()))
    .apply(BigQueryIO.writeTableRows()
        .to(tableRef)
        .withSchema(FormatSegment.getSchema())
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

    // Run the pipeline.
    p.run();
}

如果我删除watchForNewFiles部分我的代码工作得很好(我看到关于写入GCS临时位置的并行化的INFO日志,最终输出写入BigQuery)。

但是如果我让watchForNewFiles(上面的代码)然后我只看到1个INFO日志(关于写入GCS临时位置)并且执行被卡住了。 BigQuery中没有更多日志,也没有错误和输出。

任何的想法?

google-cloud-dataflow apache-beam
2个回答
1
投票

看起来当使用waitForNewFiles()时,我们必须使用BigQueryIO.Write.Method.STREAMING_INSERTS方法写入BigQuery。

有效的代码现在是这样的:

.apply(BigQueryIO.writeTableRows()
        .to(tableRef)
        .withSchema(FormatSegment.getSchema())
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

1
投票

使用DataflowRunner我尝试使用此错误.. java.lang.UnsupportedOperationException:DataflowRunner当前不支持可拆分的DoFn:org.apache.beam.sdk.transforms.Watch$WatchGrowthFn@4a1691ac

对于直接跑步者,我看到它轮询,但其余的管道似乎没有发射并且没有错误。写入数据存储区和bigquery。

© www.soinside.com 2019 - 2024. All rights reserved.