Apache Spark mapPartitionsWithIndex

问题描述 投票:0回答:2

有人可以举一个在Java中正确使用mapPartitionsWithIndex的例子吗?我找到了很多Scala的例子,但是缺少Java的例子。 我的理解是否正确,使用此功能时,单独的分区将由单独的节点处理。

我收到以下错误

method mapPartitionsWithIndex in class JavaRDD<T> cannot be applied to given types;
    JavaRDD<String> rdd = sc.textFile(filename).mapPartitionsWithIndex
    required: Function2<Integer,Iterator<String>,Iterator<R>>,boolean
    found: <anonymous Function2<Integer,Iterator<String>,Iterator<JavaRDD<String>>>>

做的时候

JavaRDD<String> rdd = sc.textFile(filename).mapPartitionsWithIndex(
    new Function2<Integer, Iterator<String>, Iterator<JavaRDD<String>> >() {

    @Override
    public Iterator<JavaRDD<String>> call(Integer ind, String s) { 
java mapreduce apache-spark
2个回答
10
投票

这是我用来删除 csv 文件第一行的代码:

JavaRDD<String> rawInputRdd = sparkContext.textFile(dataFile);

Function2 removeHeader= new Function2<Integer, Iterator<String>, Iterator<String>>(){
    @Override
    public Iterator<String> call(Integer ind, Iterator<String> iterator) throws Exception {
        if(ind==0 && iterator.hasNext()){
            iterator.next();
            return iterator;
        }else
            return iterator;
    }
};
JavaRDD<String> inputRdd = rawInputRdd.mapPartitionsWithIndex(removeHeader, false);

0
投票

您也可以使用

mapPartitionsWithIndex
而不使用匿名函数代码

   JavaRDD<String> rawInputRdd = sparkContext.textFile(dataFile);

    JavaRDD<String> inputRdd = rawInputRdd.mapPartitionsWithIndex((ind, iterator) -> {
      if (ind == 0 && iterator.hasNext()) {
        iterator.next();
        return iterator;
      } else
        return iterator;
    }, false);
© www.soinside.com 2019 - 2024. All rights reserved.