如何从csv中读取Spark SQL UserDefinedType

问题描述 投票:1回答:1

我试图定义一个基于String的UserDefinedType但不同于Spark 2.4.1中的StringType,但它看起来像Spark中有一个错误或我正在做错误的smth。

我将我的类型定义如下:

class MyType extends UserDefinedType[MyValue] {
  override def sqlType: DataType = StringType
  ...
}

@SQLUserDefinedType(udt = classOf[MyType])
case class MyValue

我希望它只需一个自定义SQL类型就可以读取并存储为String。实际上Spark根本无法读取字符串:

java.lang.ClassCastException: org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$11 cannot be cast to org.apache.spark.unsafe.types.UTF8String
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:46)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
    at org.apache.spark.sql.catalyst.expressions.JoinedRow.getUTF8String(JoinedRow.scala:102)

我认为问题在于UnivocityParser.makeConverter在UDT的情况下不返回(String => Any)函数但是(String =>(String => Any))

scala apache-spark apache-spark-sql
1个回答
1
投票

看起来它确实是Spark中的一个错误。我查看了Spark 2.4.1源代码,发现了以下内容:

case udt: UserDefinedType[_] => (datum: String) =>
  makeConverter(name, udt.sqlType, nullable, options)

将此更改为

case udt: UserDefinedType[_] => 
  makeConverter(name, udt.sqlType, nullable, options)

解决了我的问题。为Spark提出了一个问题:https://issues.apache.org/jira/browse/SPARK-27591

© www.soinside.com 2019 - 2024. All rights reserved.