Spark CSV - 找不到实际参数的适用构造函数/方法

问题描述 投票:1回答:1

我在使用过滤器上的lambda函数和java spark应用程序中的类型化数据集的映射时遇到了问题。

我收到此运行时错误

ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 130, Column 126: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public static java.sql.Date org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"

我使用下面的类和spark 2.2.0。 https://gitlab.com/opencell/test-bigdata中提供了样本数据的完整示例

Dataset<CDR> cdr = spark
            .read()
            .format("csv")
            .option("header", "true")
            .option("inferSchema", "true")
            .option("delimiter", ";")
            .csv("CDR_SAMPLE.csv")
            .as(Encoders.bean(CDR.class));

    long v = cdr.filter(x -> (x.timestamp != null && x.getAccess().length()>0)).count();

    System.out.println("validated entries :" + v);

CDR文件定义是gitlab link

编辑

val cdrCSVSchema = StructType(Array(
  StructField("timestamp", DataTypes.TimestampType),
  StructField("quantity", DataTypes.DoubleType),
  StructField("access", DataTypes.StringType),
  StructField("param1", DataTypes.StringType),
  StructField("param2", DataTypes.StringType),
  StructField("param3", DataTypes.StringType),
  StructField("param4", DataTypes.StringType),
  StructField("param5", DataTypes.StringType),
  StructField("param6", DataTypes.StringType),
  StructField("param7", DataTypes.StringType),
  StructField("param8", DataTypes.StringType),
  StructField("param9", DataTypes.StringType),
  StructField("dateParam1", DataTypes.TimestampType),
  StructField("dateParam2", DataTypes.TimestampType),
  StructField("dateParam3", DataTypes.TimestampType),
  StructField("dateParam4", DataTypes.TimestampType),
  StructField("dateParam5", DataTypes.TimestampType),
  StructField("decimalParam1", DataTypes.DoubleType),
  StructField("decimalParam2", DataTypes.DoubleType),
  StructField("decimalParam3", DataTypes.DoubleType),
  StructField("decimalParam4", DataTypes.DoubleType),
  StructField("decimalParam5", DataTypes.DoubleType),
  StructField("extraParam", DataTypes.StringType)))

我使用此命令加载CSV文档

val cdr = spark.read.format("csv").option("header", "true").option("delimiter", ";").schema(cdrCSVSchema).csv("CDR_SAMPLE.csv")

然后尝试使用此命令编码并运行lambda函数,但我仍然收到错误

cdr.as[CDR].filter(c => c.timestamp != null).show
java apache-spark apache-spark-sql apache-spark-dataset
1个回答
1
投票

TL; DR显式定义模式,因为输入数据集没有用于推断类型的值(对于java.sql.Date字段)。

对于您的情况,使用无类型数据集API可能是一个解决方案(也许是一种解决方法,老实说,我建议它以避免从内部行格式不必要的反序列化):

cdr.filter(!$"timestamp".isNull).filter(length($"access") > 0).count

(这是Scala,我要把它翻译成Java作为家庭练习)。

问题是你使用inferSchema选项,输入CDR_SAMPLE.csv文件中的大多数字段都不可用,这使得大多数String类型的字段(当没有值可用于推断更具体的类型时,这是默认类型)。

这使得java.sql.Date类型的字段,即dateParam1类型为字符串的dateParam5

import org.opencell.spark.model.CDR
import org.apache.spark.sql.Encoders
implicit val cdrEnc = Encoders.bean(classOf[CDR])
val cdrs = spark.read.
  option("inferSchema", "true").
  option("delimiter", ";").
  option("header", true).
  csv("/Users/jacek/dev/sandbox/test-bigdata/CDR_SAMPLE.csv")
scala> cdrs.printSchema
root
 |-- timestamp: timestamp (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- access: string (nullable = true)
 |-- param1: string (nullable = true)
 |-- param2: string (nullable = true)
 |-- param3: string (nullable = true)
 |-- param4: string (nullable = true)
 |-- param5: string (nullable = true)
 |-- param6: string (nullable = true)
 |-- param7: string (nullable = true)
 |-- param8: string (nullable = true)
 |-- param9: string (nullable = true)
 |-- dateParam1: string (nullable = true)
 |-- dateParam2: string (nullable = true)
 |-- dateParam3: string (nullable = true)
 |-- dateParam4: string (nullable = true)
 |-- dateParam5: string (nullable = true)
 |-- decimalParam1: string (nullable = true)
 |-- decimalParam2: string (nullable = true)
 |-- decimalParam3: string (nullable = true)
 |-- decimalParam4: string (nullable = true)
 |-- decimalParam5: string (nullable = true)
 |-- extraParam: string (nullable = true)

请注意,感兴趣的字段,即dateParam1dateParam5,都是字符串。

 |-- dateParam1: string (nullable = true)
 |-- dateParam2: string (nullable = true)
 |-- dateParam3: string (nullable = true)
 |-- dateParam4: string (nullable = true)
 |-- dateParam5: string (nullable = true)

当您通过使用CDR类中定义的编码器“假装”字段类型时,问题浮出水面,该编码说:

private Date dateParam1;
private Date dateParam2;
private Date dateParam3; 
private Date dateParam4; 
private Date dateParam5; 

这是问题的根本原因。 Spark可以从课堂上推断出什么是不同的。如果没有转换,代码就会起作用,但既然你坚持......

cdrs.as[CDR]. // <-- HERE is the issue = types don't match
  filter(cdr => cdr.timestamp != null).
  show // <-- trigger conversion

filter运算符中访问哪个字段并不重要。问题是转换会导致执行错误(以及整个Java代码生成)。

我怀疑Spark可以做很多事情,因为你请求inferSchema的数据集没有值用于类型推断。最好的办法是明确定义模式,并使用schema(...)运算符进行设置。

© www.soinside.com 2019 - 2024. All rights reserved.