当集合大小为0时,如何防止数据流管道中写入空文件?

问题描述 投票:0回答:2

我有一个数据流管道,并且我正在解析一个文件,如果我得到任何不正确的记录,那么我会将其写入 GCS 存储桶,但是当输入文件数据中没有错误时,TextIO 仍然会在 GCS 上写入空文件带标题的桶。

那么,如果 PCollection 大小为零,然后跳过此步骤,我们如何防止这种情况发生?

errorRecords.apply("WritingErrorRecords", TextIO.write().to(options.getBucketPath())
             .withHeader("ID|ERROR_CODE|ERROR_MESSAGE")
             .withoutSharding()
             .withSuffix(".txt")
             .withShardNameTemplate("-SSS")
             .withNumShards(1));
        
java google-cloud-dataflow apache-beam dataflow
2个回答
0
投票

TextIO.write()
始终写入至少一个分片,即使它是空的。无论如何,当您写入单个分片时,您可以通过在 DoFn 中手动执行写入来解决此行为,该 DoFn 将要写入的元素作为侧面输入,例如

PCollectionView<List<String>> errorRecordsView = errorRecords.apply(
    View.<String>asList());

// Your "main" PCollection is a PCollection with a single input,
// so the DoFn will get invoked exactly once. 
p.apply(Create.of(new String[]{"whatever"}))
 // The side input is your error records.
 .apply(ParDo.of(new DoFn<String, String>() {
      @ProcessElement
      public void processElement(
          @Element String unused,
          OutputReceiver<String> out,
          ProcessContext c) {
        List<String> errors = c.sideInput(errorRecordsView);
        if (!errors.isEmpty()) {
          // Open the file manually and write all the errors to it.
        }
      }
  }).withSideInputs(errorRecordsView);

能够使用本机 Beam 写入来做到这一点是一个合理的请求。在最新版本的 Beam 中,通过设置skipIfEmpty 不支持此功能。


0
投票

Beam TextIO 在 2.40.0 中添加了对

skipIfEmpty()
的支持,请参阅:https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.TypedWrite.html#skipIfEmpty --

© www.soinside.com 2019 - 2024. All rights reserved.