Spark:使用地图逻辑添加列而不使用UDF

问题描述 投票:0回答:1

基本上,我想在数据框的每一行上应用函数countSimilarColumns并将结果放在新列中。

我的代码如下

def main(args: Array[String]) = {
    val customerID           = "customer-1" //args(0)
    val rawData              = readFromResource("json", "/spark-test-data-copy.json")
    val flattenData          = rawData.select(flattenSchema(rawData.schema): _*)
    val referenceCustomerRow = flattenData.transform(getCustomer(customerID)).first
  }

def countSimilarColumns(first: Row, second: Row): Int = {
    if (!(first.getAs[String]("customer").equals(second.getAs[String]("customer"))))
      first.toSeq.zip(second.toSeq).count { case (x, y) => x == y }
    else
      -1
  }

我想做如下事情。但是我不知道该怎么做。

flattenData
  .withColumn(
    "similarity_score",
    flattenData.map(row => countSimilarColumns(row, referenceCustomerRow))
  )
  .show()

样本数据展平:

{"customer":"customer-1","att-a":"7","att-b":"3","att-c":"10","att-d":"10"}
{"customer":"customer-2","att-a":"9","att-b":"7","att-c":"12","att-d":"4"}
{"customer":"customer-3","att-a":"7","att-b":"3","att-c":"1","att-d":"10"}
{"customer":"customer-4","att-a":"9","att-b":"14","att-c":"10","att-d":"4"}

所需输出:

+--------------------+-----------+
| customer   | similarity_score |
+--------------------+-----------+
|customer-1  |  -1    | 
|customer-2  |  0    |
|customer-3  |  3    |
|customer-4  |  1    |

countSimilarColumns不变,以便对其进行测试。怎么可能我是Spark / Scala的新手。

[基本上,我想将我的函数countSimilarColumns应用于数据帧的每一行,并将结果放入新列中。我的代码如下def main(args:Array [String])= {val customerID ...
scala apache-spark apache-spark-sql
1个回答
0
投票
[flattenData的类型为DataFrame,并且在flattenData上应用映射功能将获得Dataset的结果。
© www.soinside.com 2019 - 2024. All rights reserved.