将数据集 转换为具有可选参数的类型化数据集

问题描述 投票:-1回答:2

我有一个数据集,我希望将其转换为类型数据集,其中类型是具有多个参数的Option的案例类。例如,使用spark shell创建一个case类,一个编码器和(原始)Dataset:

case class Analogue(id: Long, t1: Option[Double] = None, t2: Option[Double] = None)
val df = Seq((1, 34.0), (2,3.4)).toDF("id", "t1")
implicit val analogueChannelEncoder: Encoder[Analogue] = Encoders.product[Analogue]

我想从Dataset<Analogue>创建一个df,所以我尝试:

df.as(analogueChannelEncoder)

但是这会导致错误:

org.apache.spark.sql.AnalysisException: cannot resolve '`t2`' given input columns: [id, t1];

查看dfanalogueChannelEncoder的模式,差异显而易见:

scala> df.schema
res3: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false), StructField(t1,DoubleType,false))

scala> analogueChannelEncoder.schema
res4: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(t1,DoubleType,true), StructField(t2,DoubleType,true))

[我已经看到了this的答案,但是这对我来说不起作用,因为我的Dataset已组装好,并且不是来自数据源的直接负载

我如何将未键入的Dataset<Row>强制转换为Dataset<Analogue>

scala apache-spark
2个回答
1
投票

您的案例类别

  case class Analogue(id: Long, t1: Option[Double] = None, t2: Option[Double] = None)

您的转换代码...

 val encoderSchema = Encoders.product[Analogue].schema
  val df1: Dataset[Row] = spark.createDataset(Seq((1, 34.0), (2, 3.4))).map(x => Analogue(x._1, Option(x._2), None))
    .toDF("id", "t1", "t2")
  df1.show

  df1.printSchema()
  encoderSchema.printTreeString()

结果:

+---+----+----+
| id|  t1|  t2|
+---+----+----+
|  1|34.0|null|
|  2| 3.4|null|
+---+----+----+

root
 |-- id: long (nullable = false)
 |-- t1: double (nullable = true)
 |-- t2: double (nullable = true)

root
 |-- id: long (nullable = false)
 |-- t1: double (nullable = true)
 |-- t2: double (nullable = true)

更新(在案例类中增加了更多列,这些为选项):

我假设您的案例类有很多字段(例如5个字段)如果选项值为none,则...它的工作原理如下...

下面是示例


  case class Analogue(id: Long, t1: Option[Double] = None, t2: Option[Double] = None, t3: Option[Double] = None, t4: Option[Double] = None, t5: Option[Double] = None)

  case class Analogue(id: Long, t1: Option[Double] = None, t2: Option[Double] = None, t3: Option[Double] = None, t4: Option[Double] = None, t5: Option[Double] = None)

  val encoderSchema = Encoders.product[Analogue].schema
  println(encoderSchema.toSeq)
  val df1 = spark.createDataset(Seq((1, 34.0), (2, 3.4)))
    .map(x => Analogue(x._1, Option(x._2)))
     .as[Analogue].toDF()
  df1.show
  df1.printSchema()
  encoderSchema.printTreeString()

如果您设置存在的字段,其余字段将其视为无。

StructType(StructField(id,LongType,false), StructField(t1,DoubleType,true), StructField(t2,DoubleType,true), StructField(t3,DoubleType,true), StructField(t4,DoubleType,true), StructField(t5,DoubleType,true))
+---+----+----+----+----+----+
| id|  t1|  t2|  t3|  t4|  t5|
+---+----+----+----+----+----+
|  1|34.0|null|null|null|null|
|  2| 3.4|null|null|null|null|
+---+----+----+----+----+----+

root
 |-- id: long (nullable = false)
 |-- t1: double (nullable = true)
 |-- t2: double (nullable = true)
 |-- t3: double (nullable = true)
 |-- t4: double (nullable = true)
 |-- t5: double (nullable = true)

root
 |-- id: long (nullable = false)
 |-- t1: double (nullable = true)
 |-- t2: double (nullable = true)
 |-- t3: double (nullable = true)
 |-- t4: double (nullable = true)
 |-- t5: double (nullable = true)

如果它不能以这种方式工作,请考虑我的评论广播构想,并进一步开展工作。


0
投票

我已通过检查“传入” Dataset<Row>的列并将它们与Dataset<Analogue>中的列进行比较来解决此问题。我用来在Dataset<Row>之前附加新列,然后将其强制转换为Dataset<Analogue>的结果差异。

© www.soinside.com 2019 - 2024. All rights reserved.