如何在光束FileIO.matchAll()结果中添加其他字段?

问题描述 投票:1回答:1

我有一个KV的PCollection,其中key是gcs file_patterns,value是文件的一些附加信息(例如,生成文件的“Source”系统)。例如。,

KV("gs://bucket1/dir1/*", "SourceX"),
KV("gs://bucket1/dir2/*", "SourceY")

我需要一个PTransferm来将file_patterns扩展为GCS文件夹中的所有匹配文件,并保留“Source”字段。例如,如果在dir1下有两个文件X1.dat,X2.dat,在dir2下有一个文件(Y1.dat),则输出为:

KV("gs://bucket1/dir1/X1.dat", "SourceX"),
KV("gs://bucket1/dir1/X2.dat", "SourceX")
KV("gs://bucket1/dir2/Y1.dat", "SourceY")

我可以使用FileIO.matchAll()来实现这一目标吗?我坚持如何将“源”字段组合/加入到匹配的文件中。这是我尝试的东西,还没有完全:

public PCollection<KV<String, String> expand(PCollection<KV<String, String>> filesAndSources) {
      return filesAndSources
          .apply("Get file names", Keys.create()) 
          .apply(FileIO.matchAll())
          .apply(FileIO.readMatches())
          .apply(ParDo.of(
            new DoFn<ReadableFile, KV<String, String>>() {

              @ProcessElement
              public void processElement(ProcessContext c) {
                 ReadableFile file = c.element();
                 String fileName = file.getMetadata().resourceId().toString();
                 c.output(KV.of(fileName, XXXXX)); // How to get the value field ("Source") from the input KV?

我的难点是最后一行,对于XXXXX,如何从输入KV获取值字段(“Source”)?任何方式将输入KV的值“加入”或“组合”回“扩展”键,因为一个键(file_pattern)被扩展为多个值。

谢谢!

java google-cloud-platform google-cloud-dataflow apache-beam apache-beam-io
1个回答
1
投票

MatchResult.Medata包含您已经使用的resourceId,但不包含匹配的GCS路径(带有通配符)。

您可以使用侧输入实现您想要的效果。为了证明这一点,我创建了以下filesAndSources(根据您的评论,这可能是一个输入参数,因此它不能在下游进行硬编码):

PCollection<KV<String, String>> filesAndSources = p.apply("Create file pattern and source pairs",
    Create.of(KV.of("gs://" + Bucket + "/sales/*", "Sales"),
              KV.of("gs://" + Bucket + "/events/*", "Events")));

我将其实现为侧输入(在本例中为Map)。关键是将glob模式转换为正则表达式(感谢this answer),值将是源字符串:

final PCollectionView<Map<String, String>> regexAndSources =
filesAndSources.apply("Glob pattern to RegEx", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
    String regex = c.element().getKey();

    StringBuilder out = new StringBuilder("^");
    for(int i = 0; i < regex.length(); ++i) {
        final char ch = regex.charAt(i);
        switch(ch) {
            case '*': out.append(".*"); break;
            case '?': out.append('.'); break;
            case '.': out.append("\\."); break;
            case '\\': out.append("\\\\"); break;
            default: out.append(ch);
        }
    }
    out.append('$');
    c.output(KV.of(out.toString(), c.element().getValue()));
}})).apply("Save as Map", View.asMap());

然后,在读取文件名后,我们可以使用侧输入来解析每个路径,以查看哪个是匹配的模式/源对:

filesAndSources
  .apply("Get file names", Keys.create()) 
  .apply(FileIO.matchAll())
  .apply(FileIO.readMatches())
  .apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        ReadableFile file = c.element();
        String fileName = file.getMetadata().resourceId().toString();

        Set<Map.Entry<String,String>> patternSet = c.sideInput(regexAndSources).entrySet();    

        for (Map.Entry< String,String> pattern:patternSet) 
        { 
            if (fileName.matches(pattern.getKey())) {
              String source = pattern.getValue();
              c.output(KV.of(fileName, source));
            }
        }
     }}).withSideInputs(regexAndSources))

请注意,在实现侧输入而不是此处之前完成正则表达式转换以避免重复工作。

输出,正如我的预期:

Feb 24, 2019 10:44:05 PM org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
INFO: Matched 2 files for pattern gs://REDACTED/events/*
Feb 24, 2019 10:44:05 PM org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
INFO: Matched 2 files for pattern gs://REDACTED/sales/*
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/sales/sales1.csv, value=Sales
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/sales/sales2.csv, value=Sales
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/events/events1.csv, value=Events
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/events/events2.csv, value=Events

Full code

© www.soinside.com 2019 - 2024. All rights reserved.