如何基于if条件跳过spark rdd map动作中的行

问题描述 投票:0回答:3

我有一个文件,我想给它一个mllib算法。所以我按照这个例子做了类似的事情:

val data = sc.textFile(my_file).
    map {line =>

        val parts = line.split(",");
        Vectors.dense(parts.slice(1, parts.length).map(x => x.toDouble).toArray)
};

除了有时我有一个缺失的功能,这个工作。有时,某行中的一列没有任何数据,我想扔掉这样的行。

所以我想做像这样的map{line => if(containsMissing(line) == true){ skipLine} else{ ... //same as before}}

我该怎么做skipLine动作?

scala apache-spark apache-spark-mllib
3个回答
1
投票

您可以使用filter函数来过滤掉这些行:

val data = sc.textFile(my_file)
   .filter(_.split(",").length == cols)
   .map {line =>
        // your code
   };

假设变量cols保存有效行中的列数。


1
投票

你可以使用flatMap,Some和None:

def missingFeatures(stuff): Boolean = ??? // Determine if features is missing

val data = sc.textFile(my_file)
  .flatMap {line =>
    val parts = line.split(",");
    if(missingFeatures(parts)) None
    else Some(Vectors.dense(parts.slice(1, parts.length).map(x => x.toDouble).toArray))
};

这样就可以避免多次映射rdd。


0
投票

从Spark RDD跳过空行/标题的Java代码:

首先进口:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;

现在,过滤器将总列数比较为17或以VendorID开头的标题列。

Function<String, Boolean> isInvalid = row -> (row.split(",").length == 17 && !(row.startsWith("VendorID")));
JavaRDD<String> taxis = sc.textFile("datasets/trip_yellow_taxi.data")
                        .filter(isInvalid);
© www.soinside.com 2019 - 2024. All rights reserved.