有人可以举一个在Java中正确使用mapPartitionsWithIndex的例子吗?我找到了很多Scala的例子,但是缺少Java的例子。 我的理解是否正确,使用此功能时,单独的分区将由单独的节点处理。
我收到以下错误
method mapPartitionsWithIndex in class JavaRDD<T> cannot be applied to given types;
JavaRDD<String> rdd = sc.textFile(filename).mapPartitionsWithIndex
required: Function2<Integer,Iterator<String>,Iterator<R>>,boolean
found: <anonymous Function2<Integer,Iterator<String>,Iterator<JavaRDD<String>>>>
做的时候
JavaRDD<String> rdd = sc.textFile(filename).mapPartitionsWithIndex(
new Function2<Integer, Iterator<String>, Iterator<JavaRDD<String>> >() {
@Override
public Iterator<JavaRDD<String>> call(Integer ind, String s) {
这是我用来删除 csv 文件第一行的代码:
JavaRDD<String> rawInputRdd = sparkContext.textFile(dataFile);
Function2 removeHeader= new Function2<Integer, Iterator<String>, Iterator<String>>(){
@Override
public Iterator<String> call(Integer ind, Iterator<String> iterator) throws Exception {
if(ind==0 && iterator.hasNext()){
iterator.next();
return iterator;
}else
return iterator;
}
};
JavaRDD<String> inputRdd = rawInputRdd.mapPartitionsWithIndex(removeHeader, false);
您也可以使用
mapPartitionsWithIndex
而不使用匿名函数代码。
JavaRDD<String> rawInputRdd = sparkContext.textFile(dataFile);
JavaRDD<String> inputRdd = rawInputRdd.mapPartitionsWithIndex((ind, iterator) -> {
if (ind == 0 && iterator.hasNext()) {
iterator.next();
return iterator;
} else
return iterator;
}, false);