Spark JavaRDD 程序读取 csv 和过滤器

问题描述 投票:0回答:2

如何使用

map
filter
函数使用 RDD 读取 csv 文件,并使用 csv 文件根据特定列进行选择?这是一个 csv 文件示例。

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount

1,2017-07-01 00:06:25,2017-07-01 00:10:50,1,1.20,1,N,249,90,1,5.5,0.5,0.5,1.35,0,0.3,8.15
1,2017-07-01 00:20:04,2017-07-01 00:21:38,2,.20,1,N,249,158,2,3,0.5,0.5,0,0,0.3,4.3
1,2017-07-01 00:44:10,2017-07-01 00:59:29,1,4.30,1,N,100,45,1,15.5,0.5,0.5,3.35,0,0.3,20.15
1,2017-07-01 00:07:33,2017-07-01 00:31:30,1,8.30,1,N,138,162,1,27,0.5,0.5,6.8,5.76,0.3,40.86

我尝试了下面的代码,但我不知道如何根据特定列进行过滤并获取那些相关行。

public class SparkUseCase{

    public static void main(String[] args) {    
    
        SparkConf conf = new SparkConf().setAppName("CSV Reader").setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> allRows = sc.textFile("in/trip_yellow_taxi.data");
        System.out.println(allRows.take(5));
        List<String> headers = Arrays.asList(allRows.take(1).get(0).split(","));

        String field="VendorID";
        
        JavaRDD<String>dataWithoutHeaders = allRows.filter(x -> !(x.split(",")[headers.indexOf(field)]).equals(field));
        
        JavaRDD<Integer> VendorID = dataWithoutHeaders.map(x -> Integer.valueOf(x.split(",")[headers.indexOf(field)]));
                
        for (Integer i:VendorID.collect()){
            System.out.println(i);
        }
        
      }
}

感谢您的帮助。

用例:过滤所有RatecodeID为4的记录。

java apache-spark rdd
2个回答
1
投票

使用当前代码,您需要使用正确的字段名称,而不是

VendorID
...然后像您已经做的那样再次使用过滤器。只需检查不同的字段即可

rdd.filter(x -> x.split(",")[index].equals("4"));

但是,不要使用 Spark1 RDD 和

split(",")
的“穷人的 CSV 解析器”。

将 Spark2 csv 阅读器与 Dataframe 结合使用。

Scala 中的道歉示例

val spark = SparkSession.builder().getOrCreate() 
val df = spark
  .read()
  .format("csv")
  .option("header", "true")
  .load("in/trip_yellow_taxi.data")

val rates4 = df.filter("RatecodeID == 4")
rates4.show(false) 

0
投票

您还可以根据您的要求使用 SparkSession 读取 CSV 并查询数据。

SparkSession spark = SparkSession.builder().appName("CDX JSON Merge Job")
                .getOrCreate();
Dataset<Row> csvDataset = spark.read().format("csv").option("header", "true")
                .load("C:\\sample.csv");
csvDataset.createOrReplaceTempView("csvdataTable");
Dataset<Row> reducedCSVDataset = spark.sql("select VendorID from csvdataTable limit 2 ");
Dataset<String> rdds = reducedCSVDataset.toDF().select("VendorID").as(Encoders.STRING());
List<String> listOfStrings = rdds.collectAsList();
listOfStrings.forEach(x -> System.out.println(x));
© www.soinside.com 2019 - 2024. All rights reserved.