如何使用Apache Spark加载嵌套列的csv

问题描述 投票:0回答:1

我有一个csv文件:

name,age,phonenumbers
Tom,20,"[{number:100200, area_code:555},{number:100300, area_code:444}]"
Harry,20,"[{number:100400, area_code:555},{number:100500, area_code:666}]"

如何将Spark中的此文件加载到Person对象的Person的RDD / Dataset中:

class Person {
    String name;
    Integer age;
    List<Phone> phonenumbers;

    class Phone {
        int number;
        int area_code; 
    }
}
csv apache-spark apache-spark-sql spark-dataframe
1个回答
1
投票

不幸的是,嵌套对象的列名在示例中没有引号。真的是这样吗?因为如果他们有引号(例如格式良好的JSON),那么你可以很容易地使用from_json函数,如下所示:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val schema = new ArrayType(new StructType()
  .add("number", IntegerType)
  .add("area_code", IntegerType), false)

val converted = input.withColumn("phones", from_json('phonenumbers, schema))

如果不是这种情况,那么您将需要使用自己的逻辑将字符串转换为实际的嵌套对象,例如:

import org.apache.spark.sql.functions._

case class Phone(number: Int, area_code:Int)

case class Person(name: String, age: Int, phonenumbers: Array[Phone])

val converted = input.map {
  case Row(name: String, age: Int, phonenumbers: String) => {
    import scala.util.matching.Regex
    val phoneFormat = raw"\{number:(\d{6}), area_code:(\d{3})\}".r
    val phones = for (m <- phoneFormat.findAllMatchIn(phonenumbers)) yield Phone(m.group(1).toInt, m.group(2).toInt)
    Person(name, age, phones.toArray)
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.