我有一个数据流管道,并且我正在解析一个文件,如果我得到任何不正确的记录,那么我会将其写入 GCS 存储桶,但是当输入文件数据中没有错误时,TextIO 仍然会在 GCS 上写入空文件带标题的桶。
那么,如果 PCollection 大小为零,然后跳过此步骤,我们如何防止这种情况发生?
errorRecords.apply("WritingErrorRecords", TextIO.write().to(options.getBucketPath())
.withHeader("ID|ERROR_CODE|ERROR_MESSAGE")
.withoutSharding()
.withSuffix(".txt")
.withShardNameTemplate("-SSS")
.withNumShards(1));
TextIO.write()
始终写入至少一个分片,即使它是空的。无论如何,当您写入单个分片时,您可以通过在 DoFn 中手动执行写入来解决此行为,该 DoFn 将要写入的元素作为侧面输入,例如
PCollectionView<List<String>> errorRecordsView = errorRecords.apply(
View.<String>asList());
// Your "main" PCollection is a PCollection with a single input,
// so the DoFn will get invoked exactly once.
p.apply(Create.of(new String[]{"whatever"}))
// The side input is your error records.
.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(
@Element String unused,
OutputReceiver<String> out,
ProcessContext c) {
List<String> errors = c.sideInput(errorRecordsView);
if (!errors.isEmpty()) {
// Open the file manually and write all the errors to it.
}
}
}).withSideInputs(errorRecordsView);
能够使用本机 Beam 写入来做到这一点是一个合理的请求。在最新版本的 Beam 中,通过设置skipIfEmpty 不支持此功能。
Beam TextIO 在 2.40.0 中添加了对
skipIfEmpty()
的支持,请参阅:https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.TypedWrite.html#skipIfEmpty --