我的任务是使用 Apache Beam Java 比较 2 个 csv 文件。我的文件如下
文件1(ID)
1000672801
1000676083
1000686637
1000680800
文件2(参考图片)
gs://productimages/p_1000672801.jpg,p_1000672801,product_set,1000672801,general-v1,12V Incandescent Wedge Bulbs,,
gs://productimages/p_1000676083.jpg,p_1000676083,product_set,1000676083,general-v1,12V Incandescent Wedge Bulbs,,
gs://productimages/p_1000686637.jpg,p_1000686637,product_set,1000686637,general-v1,Skylark Contour Single Pole/3-Way dimmable CFL,,
gs://productimages/p_1000680800.jpg,p_1000680800,product_set,1000680800,general-v1,Splice Connector,,
我必须将文件 1 中的 Id 与文件 2 中的所有 Id 进行比较,如果匹配,那么我必须获取包含该 Id 的整行并将其保存在文件中。例如:1000672801 Id 在第一个参考图像中找到。因此,我会将这一行复制到另一个文件并保存。如果找不到 ID,那么它将在下一行搜索..直到文件末尾。
我想出的代码如下:
public class CsvComparer {
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create();
// Read the first CSV file
PCollection<String> file1 = pipeline.apply(TextIO.read().from("gs://my-bucket/file1.csv"));
// Read the second CSV file
PCollection<String> file2 = pipeline.apply(TextIO.read().from("gs://my-bucket/file2.csv"));
// Compare the data in the files and write the results to a file
PCollection<String> results = file1.apply(ParDo.of(new CompareCsvFn(file2)));
results.apply(TextIO.write().to("gs://my-bucket/comparison_results.csv").withSuffix(".csv"));
pipeline.run().waitUntilFinish();
}
public static class CompareCsvFn extends DoFn<String, String> {
private final PCollection<String> file2;
public CompareCsvFn(PCollection<String> file2) {
this.file2 = file2;
}
@ProcessElement
public void processElement(@Element String line, OutputReceiver<String> out) {
// Compare the line with the lines in file2
PCollection<KV<String, String>> matches = file2.apply(ParDo.of(new CompareLinesFn(line)));
// Write the matching lines to the output
matches.apply(ParDo.of(new FormatOutputFn(line))).apply(out);
}
}
public static class CompareLinesFn extends DoFn<String, KV<String, String>> {
private final String line;
public CompareLinesFn(String line) {
this.line = line;
}
@ProcessElement
public void processElement(@Element String otherLine, OutputReceiver<KV<String, String>> out) {
// Compare the lines and output a KV pair if they match
// this logic would need to be changed I understand as I have to compare one line in file1 with all other lines in file2
if (line.equals(otherLine)) {
out.output(KV.of(line, otherLine));
}
}
}
public static class FormatOutputFn extends DoFn<KV<String, String>, String> {
private final String line;
public FormatOutputFn(String line) {
this.line = line;
}
@ProcessElement
public void processElement(@Element KV<String, String> match, OutputReceiver<String> out) {
// Format the output
String output = String.format("Line in file 1: %s\nLine in file 2: %s\n", line, match.getValue());
out.output(output);
}
}
}
请就上述问题帮助我并提出建议。我已经搜索过网络和其他区域但没有结果。
您可以使用 KeyedPCollectionTuple 来连接这两个文件,这可能只是一个起点,但请看一下:https://beam.apache.org/documentation/pipelines/design-your-pipeline/