我有两个数据集,ds1 和 ds2,它们具有以下各自的模式:
地址DS:
> addressDS.printSchema()
root
|-- endId: string (nullable = true)
|-- parentId: string (nullable = true)
|-- address: string (nullable = true)
|-- countries: string (nullable = true)
|-- sourceId: string (nullable = true)
edgeDS:
edgeDS.printSchema()
root
|-- endId: string (nullable = true)
|-- startId: string (nullable = true)
|-- edgeType: string (nullable = true)
|-- link: string (nullable = true)
我对这两个数据集进行了内部连接:
> val joinedDS = addressDS.join(edgeDS, "endId")
> joinedDS.printSchema()
root
|-- endId: string (nullable = true)
|-- parentId: string (nullable = true)
|-- address: string (nullable = true)
|-- countries: string (nullable = true)
|-- sourceId: string (nullable = true)
|-- startId: string (nullable = true)
|-- edgeType: string (nullable = true)
|-- link: string (nullable = true)
如您所见,Spark 解释器知道模式是两者的组合。接下来,我想对字段
startId
上的结果数据集执行 groupByKey。然而,尽管 Spark 解释器知道此模式,但它无法这样做,从而产生以下错误:
> val groupedDS = joinedDS.groupByKey{ x => x .startId }
<console>:38: error: value startId is not a member of org.apache.spark.sql.Row
val groupedDS = joinedDS.groupByKey{ x => x.startId }
我对这个话题做了很多研究。 Scala 中
groupByKey
函数的语法如下:
groupByKey[K](func: (T) ⇒ K)(implicit arg0: Encoder[K]): KeyValueGroupedDataset[K, T]
在这个例子中,我的大括号是语法糖,代表lambda函数:
(x: Row) => x.getAs[String]("startId")
编译器应该能够推断出这一点,因为根据架构,它知道
startId
类型是String
。但显然不是,我必须明确地输入:
> val groupedDS = joinedDS.groupByKey{ x => x.getAs[String]("startId") }
尽管模式已知,但为什么 Spark 不能推断出这种类型?为什么我必须使用
.getAs
手动设置类型?