我有一个KV的PCollection,其中key是gcs file_patterns,value是文件的一些附加信息(例如,生成文件的“Source”系统)。例如。,
KV("gs://bucket1/dir1/*", "SourceX"),
KV("gs://bucket1/dir2/*", "SourceY")
我需要一个PTransferm来将file_patterns扩展为GCS文件夹中的所有匹配文件,并保留“Source”字段。例如,如果在dir1下有两个文件X1.dat,X2.dat,在dir2下有一个文件(Y1.dat),则输出为:
KV("gs://bucket1/dir1/X1.dat", "SourceX"),
KV("gs://bucket1/dir1/X2.dat", "SourceX")
KV("gs://bucket1/dir2/Y1.dat", "SourceY")
我可以使用FileIO.matchAll()来实现这一目标吗?我坚持如何将“源”字段组合/加入到匹配的文件中。这是我尝试的东西,还没有完全:
public PCollection<KV<String, String> expand(PCollection<KV<String, String>> filesAndSources) {
return filesAndSources
.apply("Get file names", Keys.create())
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(ParDo.of(
new DoFn<ReadableFile, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
ReadableFile file = c.element();
String fileName = file.getMetadata().resourceId().toString();
c.output(KV.of(fileName, XXXXX)); // How to get the value field ("Source") from the input KV?
我的难点是最后一行,对于XXXXX,如何从输入KV获取值字段(“Source”)?任何方式将输入KV的值“加入”或“组合”回“扩展”键,因为一个键(file_pattern)被扩展为多个值。
谢谢!
MatchResult.Medata
包含您已经使用的resourceId
,但不包含匹配的GCS路径(带有通配符)。
您可以使用侧输入实现您想要的效果。为了证明这一点,我创建了以下filesAndSources
(根据您的评论,这可能是一个输入参数,因此它不能在下游进行硬编码):
PCollection<KV<String, String>> filesAndSources = p.apply("Create file pattern and source pairs",
Create.of(KV.of("gs://" + Bucket + "/sales/*", "Sales"),
KV.of("gs://" + Bucket + "/events/*", "Events")));
我将其实现为侧输入(在本例中为Map
)。关键是将glob模式转换为正则表达式(感谢this answer),值将是源字符串:
final PCollectionView<Map<String, String>> regexAndSources =
filesAndSources.apply("Glob pattern to RegEx", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
String regex = c.element().getKey();
StringBuilder out = new StringBuilder("^");
for(int i = 0; i < regex.length(); ++i) {
final char ch = regex.charAt(i);
switch(ch) {
case '*': out.append(".*"); break;
case '?': out.append('.'); break;
case '.': out.append("\\."); break;
case '\\': out.append("\\\\"); break;
default: out.append(ch);
}
}
out.append('$');
c.output(KV.of(out.toString(), c.element().getValue()));
}})).apply("Save as Map", View.asMap());
然后,在读取文件名后,我们可以使用侧输入来解析每个路径,以查看哪个是匹配的模式/源对:
filesAndSources
.apply("Get file names", Keys.create())
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
ReadableFile file = c.element();
String fileName = file.getMetadata().resourceId().toString();
Set<Map.Entry<String,String>> patternSet = c.sideInput(regexAndSources).entrySet();
for (Map.Entry< String,String> pattern:patternSet)
{
if (fileName.matches(pattern.getKey())) {
String source = pattern.getValue();
c.output(KV.of(fileName, source));
}
}
}}).withSideInputs(regexAndSources))
请注意,在实现侧输入而不是此处之前完成正则表达式转换以避免重复工作。
输出,正如我的预期:
Feb 24, 2019 10:44:05 PM org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
INFO: Matched 2 files for pattern gs://REDACTED/events/*
Feb 24, 2019 10:44:05 PM org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
INFO: Matched 2 files for pattern gs://REDACTED/sales/*
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/sales/sales1.csv, value=Sales
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/sales/sales2.csv, value=Sales
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/events/events1.csv, value=Events
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/events/events2.csv, value=Events