使用 apache beam java 比较 2 个 csv 文件

问题描述 投票:0回答:1

我的任务是使用 Apache Beam Java 比较 2 个 csv 文件。我的文件如下

文件1(ID)

1000672801
1000676083
1000686637
1000680800

文件2(参考图片)

gs://productimages/p_1000672801.jpg,p_1000672801,product_set,1000672801,general-v1,12V Incandescent Wedge Bulbs,,
gs://productimages/p_1000676083.jpg,p_1000676083,product_set,1000676083,general-v1,12V Incandescent Wedge Bulbs,,
gs://productimages/p_1000686637.jpg,p_1000686637,product_set,1000686637,general-v1,Skylark Contour Single Pole/3-Way dimmable CFL,,
gs://productimages/p_1000680800.jpg,p_1000680800,product_set,1000680800,general-v1,Splice Connector,,

我必须将文件 1 中的 Id 与文件 2 中的所有 Id 进行比较,如果匹配,那么我必须获取包含该 Id 的整行并将其保存在文件中。例如:1000672801 Id 在第一个参考图像中找到。因此,我会将这一行复制到另一个文件并保存。如果找不到 ID,那么它将在下一行搜索..直到文件末尾。

我想出的代码如下:

public class CsvComparer {
    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create();

        // Read the first CSV file
        PCollection<String> file1 = pipeline.apply(TextIO.read().from("gs://my-bucket/file1.csv"));

        // Read the second CSV file
        PCollection<String> file2 = pipeline.apply(TextIO.read().from("gs://my-bucket/file2.csv"));

        // Compare the data in the files and write the results to a file
        PCollection<String> results = file1.apply(ParDo.of(new CompareCsvFn(file2)));
        results.apply(TextIO.write().to("gs://my-bucket/comparison_results.csv").withSuffix(".csv"));

        pipeline.run().waitUntilFinish();
    } 

    public static class CompareCsvFn extends DoFn<String, String> {
        private final PCollection<String> file2; 

        public CompareCsvFn(PCollection<String> file2) {
            this.file2 = file2;
        } 

        @ProcessElement
        public void processElement(@Element String line, OutputReceiver<String> out) {
            // Compare the line with the lines in file2
            PCollection<KV<String, String>> matches = file2.apply(ParDo.of(new CompareLinesFn(line))); 

            // Write the matching lines to the output
            matches.apply(ParDo.of(new FormatOutputFn(line))).apply(out);
        }
    } 

    public static class CompareLinesFn extends DoFn<String, KV<String, String>> {
        private final String line; 

        public CompareLinesFn(String line) {
            this.line = line;
        } 

        @ProcessElement
        public void processElement(@Element String otherLine, OutputReceiver<KV<String, String>> out) {
            // Compare the lines and output a KV pair if they match
// this logic would need to be changed I understand as I have to compare one line in file1 with all other lines in file2 
            if (line.equals(otherLine)) {
                out.output(KV.of(line, otherLine));
            }
        }
    } 

    public static class FormatOutputFn extends DoFn<KV<String, String>, String> {
        private final String line; 

        public FormatOutputFn(String line) {
            this.line = line;
        } 

        @ProcessElement
        public void processElement(@Element KV<String, String> match, OutputReceiver<String> out) {
            // Format the output
            String output = String.format("Line in file 1: %s\nLine in file 2: %s\n", line, match.getValue());
            out.output(output);
        }
    }
}

请就上述问题帮助我并提出建议。我已经搜索过网络和其他区域但没有结果。

java apache-beam airflow-2.x
1个回答
0
投票

您可以使用 KeyedPCollectionTuple 来连接这两个文件,这可能只是一个起点,但请看一下:https://beam.apache.org/documentation/pipelines/design-your-pipeline/

© www.soinside.com 2019 - 2024. All rights reserved.