我有一个数据集,我希望将其转换为类型数据集,其中类型是具有多个参数的Option
的案例类。例如,使用spark shell创建一个case类,一个编码器和(原始)Dataset:
case class Analogue(id: Long, t1: Option[Double] = None, t2: Option[Double] = None)
val df = Seq((1, 34.0), (2,3.4)).toDF("id", "t1")
implicit val analogueChannelEncoder: Encoder[Analogue] = Encoders.product[Analogue]
我想从Dataset<Analogue>
创建一个df
,所以我尝试:
df.as(analogueChannelEncoder)
但是这会导致错误:
org.apache.spark.sql.AnalysisException: cannot resolve '`t2`' given input columns: [id, t1];
查看df
和analogueChannelEncoder
的模式,差异显而易见:
scala> df.schema
res3: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false), StructField(t1,DoubleType,false))
scala> analogueChannelEncoder.schema
res4: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(t1,DoubleType,true), StructField(t2,DoubleType,true))
[我已经看到了this的答案,但是这对我来说不起作用,因为我的Dataset
已组装好,并且不是来自数据源的直接负载
我如何将未键入的Dataset<Row>
强制转换为Dataset<Analogue>
?
您的案例类别
case class Analogue(id: Long, t1: Option[Double] = None, t2: Option[Double] = None)
您的转换代码...
val encoderSchema = Encoders.product[Analogue].schema
val df1: Dataset[Row] = spark.createDataset(Seq((1, 34.0), (2, 3.4))).map(x => Analogue(x._1, Option(x._2), None))
.toDF("id", "t1", "t2")
df1.show
df1.printSchema()
encoderSchema.printTreeString()
结果:
+---+----+----+
| id| t1| t2|
+---+----+----+
| 1|34.0|null|
| 2| 3.4|null|
+---+----+----+
root
|-- id: long (nullable = false)
|-- t1: double (nullable = true)
|-- t2: double (nullable = true)
root
|-- id: long (nullable = false)
|-- t1: double (nullable = true)
|-- t2: double (nullable = true)
我假设您的案例类有很多字段(例如5个字段)如果选项值为none,则...它的工作原理如下...
下面是示例
case class Analogue(id: Long, t1: Option[Double] = None, t2: Option[Double] = None, t3: Option[Double] = None, t4: Option[Double] = None, t5: Option[Double] = None)
case class Analogue(id: Long, t1: Option[Double] = None, t2: Option[Double] = None, t3: Option[Double] = None, t4: Option[Double] = None, t5: Option[Double] = None)
val encoderSchema = Encoders.product[Analogue].schema
println(encoderSchema.toSeq)
val df1 = spark.createDataset(Seq((1, 34.0), (2, 3.4)))
.map(x => Analogue(x._1, Option(x._2)))
.as[Analogue].toDF()
df1.show
df1.printSchema()
encoderSchema.printTreeString()
如果您设置存在的字段,其余字段将其视为无。
StructType(StructField(id,LongType,false), StructField(t1,DoubleType,true), StructField(t2,DoubleType,true), StructField(t3,DoubleType,true), StructField(t4,DoubleType,true), StructField(t5,DoubleType,true))
+---+----+----+----+----+----+
| id| t1| t2| t3| t4| t5|
+---+----+----+----+----+----+
| 1|34.0|null|null|null|null|
| 2| 3.4|null|null|null|null|
+---+----+----+----+----+----+
root
|-- id: long (nullable = false)
|-- t1: double (nullable = true)
|-- t2: double (nullable = true)
|-- t3: double (nullable = true)
|-- t4: double (nullable = true)
|-- t5: double (nullable = true)
root
|-- id: long (nullable = false)
|-- t1: double (nullable = true)
|-- t2: double (nullable = true)
|-- t3: double (nullable = true)
|-- t4: double (nullable = true)
|-- t5: double (nullable = true)
如果它不能以这种方式工作,请考虑我的评论广播构想,并进一步开展工作。
我已通过检查“传入” Dataset<Row>
的列并将它们与Dataset<Analogue>
中的列进行比较来解决此问题。我用来在Dataset<Row>
之前附加新列,然后将其强制转换为Dataset<Analogue>
的结果差异。