如何创建数据集或将其转换为数据集[行]

问题描述 投票:0回答:1

您好,我正在尝试测试以下将Dataset [Row]作为参数的函数

 def getReducedSubsidiaries(dsSubsidiaries: Dataset[Row])(implicit spark: SparkSession): Dataset[SubsidiariesImpacted] = {
    import spark.implicits._
    dsSubsidiaries.as[SubsidiariesImpactedStage]
      .groupByKey(_.subsidiary_uuid)
        .reduceGroups((a, b) => if (a.event_timestamp.compareTo(b.event_timestamp) >= 0) a else b)
        .map(_._2)
        .select(
          $"subsidiary_uuid",
          $"subsidiary_id",
          $"company_uuid"
        )
        .as[SubsidiariesImpacted]
  }

[我试图创建一个通过此函数传递的数据集,但是我不确定如何将我创建的该数据集转换为它需要的数据集[行]。


      val ts1 = Timestamp.valueOf("2019-08-01 00:00:00")
      val ts2 = Timestamp.valueOf("2019-09-20 00:00:00")
      val ts3 = Timestamp.valueOf("2019-11-27 00:00:00")
      val subsidiaries:Dataset[SubsidiariesImpactedStage] = Seq(
        SubsidiariesImpactedStage(ts1,"active","sub_uuid1",32L,"comp_uuid1"),
        SubsidiariesImpactedStage(ts2, "inactive","sub_uuid1",32L, "comp_uuid1"),
        SubsidiariesImpactedStage(ts3, "active", "sub_uuid1", 5L, "latest_comp_uuid1")
      ).toDS()
scala apache-spark apache-spark-sql dataset apache-spark-dataset
1个回答
0
投票
Dataframe是DataSet [Row](> spark 2.0)

因此只需将Seq转换为DF而不是DS,然后再作为参数传递给函数。

case class Person(name: String, age: Int) def fun(d:Dataset[Row])=d.show() fun(Seq(Person("a", 1)).toDF())

© www.soinside.com 2019 - 2024. All rights reserved.