您好,我正在尝试测试以下将Dataset [Row]作为参数的函数
def getReducedSubsidiaries(dsSubsidiaries: Dataset[Row])(implicit spark: SparkSession): Dataset[SubsidiariesImpacted] = {
import spark.implicits._
dsSubsidiaries.as[SubsidiariesImpactedStage]
.groupByKey(_.subsidiary_uuid)
.reduceGroups((a, b) => if (a.event_timestamp.compareTo(b.event_timestamp) >= 0) a else b)
.map(_._2)
.select(
$"subsidiary_uuid",
$"subsidiary_id",
$"company_uuid"
)
.as[SubsidiariesImpacted]
}
[我试图创建一个通过此函数传递的数据集,但是我不确定如何将我创建的该数据集转换为它需要的数据集[行]。
val ts1 = Timestamp.valueOf("2019-08-01 00:00:00")
val ts2 = Timestamp.valueOf("2019-09-20 00:00:00")
val ts3 = Timestamp.valueOf("2019-11-27 00:00:00")
val subsidiaries:Dataset[SubsidiariesImpactedStage] = Seq(
SubsidiariesImpactedStage(ts1,"active","sub_uuid1",32L,"comp_uuid1"),
SubsidiariesImpactedStage(ts2, "inactive","sub_uuid1",32L, "comp_uuid1"),
SubsidiariesImpactedStage(ts3, "active", "sub_uuid1", 5L, "latest_comp_uuid1")
).toDS()
因此只需将Seq转换为DF而不是DS,然后再作为参数传递给函数。
case class Person(name: String, age: Int)
def fun(d:Dataset[Row])=d.show()
fun(Seq(Person("a", 1)).toDF())