从可变长度CSV到对RDD的Spark转换

问题描述 投票:2回答:3

我是scala spark的新手,我有一个CSV文件,如下所示。

R001, A, 10, C, 30, D, 50, X, 90
R002, E, 40, F, 70, G, 80, H, 90, J, 25 
R003, L, 30, M, 54, N, 67, O, 25, P, 85, Q, 100

如何将其转换为配对RDD,将行的第一个值作为键并跳过数值?

R001, A
R001, C
R001, D
R001, X
R002, E
R002, F
R002, G
R002, H
R002, J
R003, L
R003, M
R003, N
R003, O
R003, P
R003, Q

我试过这个看起来很好,但我认为有更好的方法来做到这一点。

def isNumeric(str:String): Boolean = str.matches("[-+]?\\d+(\\.\\d+)?")

def process(field: Array[String]): Array[String] = { 
 val results = new Array[String](field.length)  
 for (i <- 1 to field.length-1) {
    if(!isNumeric(field(i).trim) && field(0)!=null && field(i)!=null)
     results(i)= field(0)+":"+field(i)
 }  
  results  
};

 def skipNulls(input : String) : String = {  
  if(input!=null && input.trim!="" && input.contains(":")) {
  var res = input.split(":")    
  res(0)+","+res(1)
  }  else {
    "null"
  }
};


val a= raw_csv.map(_.split(",")).flatMap(k => process(k))
val b= a.map(k => skipNulls(k))
val c = b.filter( x => x.contains("null")==false)
val d= c.toDF()
d.show
display(d)
scala apache-spark rdd
3个回答
2
投票

看起来你很大程度上对Spark部分有正确的想法(看起来你看起来并不像你想要的RDD那样?我假设你知道这一点)问题是我们是否可以清理processskipNulls 。因为你正在使用flatMap,我认为你应该能够过滤掉process中的坏点。

如果我正确理解,你会引入空值(然后在以后删除它们),因为你有这个固定长度的数组。但是,我们可以分离“键”和“值”,过滤到非数字值,然后组装我们想要的对而无需创建此数组。就像是:

def process(line: String): List[(String, String)] = {
  val key :: values = line.split(",").toList
  values.filterNot(isNumeric).map(key -> _) // equivalent to .map(x => (key, x))
}

这结合了你的前几个步骤,所以我的版本将是raw_csv.flatMap(process)来到RDD[(String, String)]

拆分列表可能有点神奇。以下也可以:

val elements = line.split(",")
val key = elements.head
val values = elements.tail

您可以将isNumeric重写为:

def isNumeric(s: String): Boolean = Try(s.toFloat).isSuccess

我对这是否比正则表达更好或更差没有强烈的意见。 Tryscala.util


1
投票

霍伊兰发布了答案。

我只是再次提供完整的代码

val raw_csv = sc.parallelize(Array(R001, A, 10, C, 30, D, 50, X, 90
R002, E, 40, F, 70, G, 80, H, 90, J, 25 
R003, L, 30, M, 54, N, 67, O, 25, P, 85, Q, 100))

import scala.util.{Try, Success, Failure}
def isNumeric(s: String): Boolean = Try(s.toFloat).isSuccess

def process(line: String): List[(String, String)] = {
  val key :: values = line.split(",").toList
  values.filterNot(isNumeric).map(key -> _) 
}

val processed = raw_csv.flatMap(k => process(k))
display(processed.toDF)

结果如下

R001, A
R001, C
R001, D
R001, X
R002, E
R002, F
R002, G
R002, H
R002, J
R003, L
R003, M
R003, N
R003, O
R003, P
R003, Q

1
投票

如果你想要一个不同的方法,你可以通过首先将csv文本的每一行转换为两个字符串来简化解决方案:键和行的其余部分(因此绕过可变长度问题)。

步骤:1)通过在第一个","处拆分,将源文本文件的每一行划分为一个键字符串和余数字符串数组,2)使用replaceAllIn()删除剩余字符串中的所有数字字段并将其拆分为数组,并且,3)将(键,数组)元素展平为所需的键值对。

val numericField = """\s*[-+]?\d+(\.\d+)?\s*,?""".r

sc.textFile("/path/to/csvFile").
  map( _.split(",\\s*", 2) ).
  map{ case Array(key, remainder) =>
    (key, numericField.replaceAllIn(remainder, "").split(",\\s*")) }.
  flatMap{ case (k, arr) => arr.map( (k, _) ) }
// res1: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[21] ...

res1.collect
// res2: Array[(String, String)] = Array(
//   (R001,A), (R001,C), (R001,D), (R001,X),
//   (R002,E), (R002,F), (R002,G), (R002,H), (R002,J),
//   (R003,L), (R003,M), (R003,N), (R003,O), (R003,P), (R003,Q)
// )
© www.soinside.com 2019 - 2024. All rights reserved.