使用Spark和Java 8获取并过滤多个列

问题描述 投票:1回答:1

好吧,我有一个包含4列,id,用户,数据(字符串)和日期的tsv文件。我想要做的是从最短的数据中获取用户,数据和日期。所以我试着这样做。

        JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

         JavaRDD<String> lines = sparkContext
            .textFile(args[0]);

        JavaRDD<String> messages = lines
            .map(line -> line.split("\t+")[2]);
        JavaPairRDD<String, Integer> ones = messages
            .mapToPair(string -> new Tuple2<>(string,string.length()));
        JavaPairRDD<Integer, String> reverse = ones
            .mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1));
        JavaPairRDD<Integer, String> sorted = reverse
            .sortByKey(true)
            .cache();

        List<Tuple2<Integer, String>> output = sorted
            .take(1);

有了这个,我有最短的数据和他的长度,但现在我如何将其与用户和日期相关联?我不知道该怎么做......任何想法?

java apache-spark java-8
1个回答
1
投票

在使用Tuple2创建对rdd时,您可以将整个记录存储为第二个字段。在Pair Rdd中,将密钥设为列data的长度,值将是整个记录。然后在执行sortByKey之后,对rdd将根据键(列数据的长度)进行排序。在整个记录的值中,您可以使用tuple._2访问该记录,然后使用分隔符拆分并使用您需要的任何值。如下所示:

JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

 JavaRDD<String> lines = sparkContext
            .textFile(args[0]);

JavaPairRDD<Integer,String> pairedRdd = lines.mapToPair(lines -> new Tuple2((lines.split("\t+")[2]).length(), lines));

        JavaPairRDD<Integer, String> sortedRdd = pairedRdd.sortByKey(true);

        List<Tuple2<Integer, String>> output = sortedRdd.take(1);

        System.out.println(output.get(0)._1+" "+output.get(0)._2);

output.get(0)._2有完整的记录,与\t+分道扬..从数组中,获取所需的元素。

© www.soinside.com 2019 - 2024. All rights reserved.