基本上,我想在数据框的每一行上应用函数countSimilarColumns并将结果放在新列中。
我的代码如下
def main(args: Array[String]) = {
val customerID = "customer-1" //args(0)
val rawData = readFromResource("json", "/spark-test-data-copy.json")
val flattenData = rawData.select(flattenSchema(rawData.schema): _*)
val referenceCustomerRow = flattenData.transform(getCustomer(customerID)).first
}
def countSimilarColumns(first: Row, second: Row): Int = {
if (!(first.getAs[String]("customer").equals(second.getAs[String]("customer"))))
first.toSeq.zip(second.toSeq).count { case (x, y) => x == y }
else
-1
}
我想做如下事情。但是我不知道该怎么做。
flattenData
.withColumn(
"similarity_score",
flattenData.map(row => countSimilarColumns(row, referenceCustomerRow))
)
.show()
样本数据展平:
{"customer":"customer-1","att-a":"7","att-b":"3","att-c":"10","att-d":"10"}
{"customer":"customer-2","att-a":"9","att-b":"7","att-c":"12","att-d":"4"}
{"customer":"customer-3","att-a":"7","att-b":"3","att-c":"1","att-d":"10"}
{"customer":"customer-4","att-a":"9","att-b":"14","att-c":"10","att-d":"4"}
所需输出:
+--------------------+-----------+
| customer | similarity_score |
+--------------------+-----------+
|customer-1 | -1 |
|customer-2 | 0 |
|customer-3 | 3 |
|customer-4 | 1 |
flattenData
的类型为DataFrame
,并且在flattenData上应用映射功能将获得Dataset
的结果。