使用Apache Spark在多个对子上有效地进行卡方积的计算。

问题描述 投票:1回答:1

旧语境

我正在对我的数据集进行一个计算,要求每个元素都与自身结合,即通过执行 mapToPair 关于 JavaPairRDD<Tuple2<String, List<Double>>, Tuple2<String, List<Double>>> 型。这种组合是用 cartesian 按照。

JavaPairRDD<String, List<Double>> keyvals;
...
JavaPairRDD<Tuple2<String, List<Double>>, Tuple2<String, List<Double>>> combined = keyvals.cartesian(keyvals).filter(tpl -> !tpl._1._1.equals(tpl._2._1));

combined.mapToPair(tpl -> {
   Tuple2<String, String> ids = new Tuple2<>(tpl._1._1, tpl._2._1);

    double result = calculateResult(tpl._1._2, tpl._2._2);

    return new Tuple2<>(ids, result);
}).filter(tpl -> tpl._2 > threshold).saveAsTextFile("result");

新的背景

我现在已经扩展了这个方法 calculateResult 接受三个 List<Double> 类型(而不是上面例子中的两个)。这就要求数据集与自身结合两次。不过在这里,。cartesian 似乎有所欠缺。

我的问题是:我怎样才能把我的数据(keyvals)与自己两次,本质上产生的东西与 JavaPairRDD<Tuple2<...>, Tuple2<...>, Tuple2<...>> (psuedocode)。

我的目标是调用这个方法 calculateResult(List<Double> s1, List<Double> s2 ,List<Double> s3) 在每个交叉组合对上。我想,我试图用笛卡尔扩展上面给出的例子,可能没有采取正确的方法,但我不知道什么才是正确的前进步骤。

不幸的是,我受限于只能使用Spark Java 2.4.x。

java apache-spark rdd spark-java
1个回答
1
投票

希望对你有所帮助

我已经添加了代码内联注释来描述我正在努力做的事情我已经添加了目的。List 而不是 Tuple3 如果您需要进行更多的 catesian joins

JavaPairRDD<List<String>, List<List<Double>>> result =
                keyvals.cartesian(keyvals)
                .filter(tpl -> !tpl._1._1.equals(tpl._2._1))
                        //Perform 3rd cartesian
                .cartesian(keyvals)
                        //Skip the common ids from 1st and 3rd keyvals
                .filter(tpl -> !tpl._1._1._1.equals(tpl._2._1))
                        //Map the result top Pair of Ids:List<String> and Values:List<List<Double>>
                .mapToPair((PairFunction<Tuple2<Tuple2<Tuple2<String, List<Double>>, Tuple2<String, List<Double>>>, Tuple2<String, List<Double>>>, List<String>, List<List<Double>>>) tuple2Tuple2Tuple2 -> {

                    //Combine Ids to single List
                    List<String> keys = new ArrayList<>();
                    keys.add(tuple2Tuple2Tuple2._1._1._1);
                    keys.add(tuple2Tuple2Tuple2._1._2._1);
                    keys.add(tuple2Tuple2Tuple2._2._1);

                    //Combine values to single List
                    List<List<Double>> values = new ArrayList<>();
                    values.add(tuple2Tuple2Tuple2._1._1._2);
                    values.add(tuple2Tuple2Tuple2._1._2._2);
                    values.add(tuple2Tuple2Tuple2._2._2);

                    //Return tuple of List of Ids and List of Values which are of fixed size 3
                    return new Tuple2<>(keys,values);
                });

        result.mapToPair(tpl -> {
            Tuple3<String, String,String> ids = new Tuple3<>(tpl._1.get(0), tpl._1.get(1), tpl._1.get(2));
            double result = calculateResult(tpl._2.get(0), tpl._2.get(1),tpl._2.get(2));
            return new Tuple2<>(ids, result);
        }).filter(tpl -> tpl._2 > threshold).saveAsTextFile("result");

注意:将Spark Java代码迁移到Spark java DataFrames将缩短你的代码并降低复杂性。

© www.soinside.com 2019 - 2024. All rights reserved.