我正在对我的数据集进行一个计算,要求每个元素都与自身结合,即通过执行 mapToPair
关于 JavaPairRDD<Tuple2<String, List<Double>>, Tuple2<String, List<Double>>>
型。这种组合是用 cartesian
按照。
JavaPairRDD<String, List<Double>> keyvals;
...
JavaPairRDD<Tuple2<String, List<Double>>, Tuple2<String, List<Double>>> combined = keyvals.cartesian(keyvals).filter(tpl -> !tpl._1._1.equals(tpl._2._1));
combined.mapToPair(tpl -> {
Tuple2<String, String> ids = new Tuple2<>(tpl._1._1, tpl._2._1);
double result = calculateResult(tpl._1._2, tpl._2._2);
return new Tuple2<>(ids, result);
}).filter(tpl -> tpl._2 > threshold).saveAsTextFile("result");
我现在已经扩展了这个方法 calculateResult
接受三个 List<Double>
类型(而不是上面例子中的两个)。这就要求数据集与自身结合两次。不过在这里,。cartesian
似乎有所欠缺。
我的问题是:我怎样才能把我的数据(keyvals
)与自己两次,本质上产生的东西与 JavaPairRDD<Tuple2<...>, Tuple2<...>, Tuple2<...>>
(psuedocode)。
我的目标是调用这个方法 calculateResult(List<Double> s1, List<Double> s2 ,List<Double> s3)
在每个交叉组合对上。我想,我试图用笛卡尔扩展上面给出的例子,可能没有采取正确的方法,但我不知道什么才是正确的前进步骤。
不幸的是,我受限于只能使用Spark Java 2.4.x。
希望对你有所帮助
我已经添加了代码内联注释来描述我正在努力做的事情我已经添加了目的。List
而不是 Tuple3
如果您需要进行更多的 catesian joins
JavaPairRDD<List<String>, List<List<Double>>> result =
keyvals.cartesian(keyvals)
.filter(tpl -> !tpl._1._1.equals(tpl._2._1))
//Perform 3rd cartesian
.cartesian(keyvals)
//Skip the common ids from 1st and 3rd keyvals
.filter(tpl -> !tpl._1._1._1.equals(tpl._2._1))
//Map the result top Pair of Ids:List<String> and Values:List<List<Double>>
.mapToPair((PairFunction<Tuple2<Tuple2<Tuple2<String, List<Double>>, Tuple2<String, List<Double>>>, Tuple2<String, List<Double>>>, List<String>, List<List<Double>>>) tuple2Tuple2Tuple2 -> {
//Combine Ids to single List
List<String> keys = new ArrayList<>();
keys.add(tuple2Tuple2Tuple2._1._1._1);
keys.add(tuple2Tuple2Tuple2._1._2._1);
keys.add(tuple2Tuple2Tuple2._2._1);
//Combine values to single List
List<List<Double>> values = new ArrayList<>();
values.add(tuple2Tuple2Tuple2._1._1._2);
values.add(tuple2Tuple2Tuple2._1._2._2);
values.add(tuple2Tuple2Tuple2._2._2);
//Return tuple of List of Ids and List of Values which are of fixed size 3
return new Tuple2<>(keys,values);
});
result.mapToPair(tpl -> {
Tuple3<String, String,String> ids = new Tuple3<>(tpl._1.get(0), tpl._1.get(1), tpl._1.get(2));
double result = calculateResult(tpl._2.get(0), tpl._2.get(1),tpl._2.get(2));
return new Tuple2<>(ids, result);
}).filter(tpl -> tpl._2 > threshold).saveAsTextFile("result");
注意:将Spark Java代码迁移到Spark java DataFrames将缩短你的代码并降低复杂性。