循环遍历数据帧并同时更新查找表:spark scala

问题描述 投票:1回答:1

我有一个像下面这样的DataFrame

+---+-------------+-----+
| id|AccountNumber|scale|
+---+-------------+-----+
|  1|      1500847|    6|
|  2|      1501199|    7|
|  3|      1119024|    3|
+---+-------------+-----+

我必须填充第二个DataFrame,它最初是空的,如下所示。

id  AccountNumber   scale
1   1500847         6
2   1501199         6
3   1119024         3

Output explaination

第一个DataFrame中的第一行的scale为6.检查结果中该值减去1(所以scale等于5)。没有,所以只需将行(1,1500847,6)添加到输出中。

输出中的第二行有一个scale为7.原始表已经有一行scale 7 - 1,所以添加这一行,但使用该比例(2, 15001199, 6)

第三行作为第一行。

scala apache-spark spark-dataframe
1个回答
1
投票

使用广播列表

您可以将scale列中的所有比例收集为数组,并将broadcast收集到udf函数中。然后使用udf逻辑中的when函数和withColumn作为

import org.apache.spark.sql.functions._
val collectedList = sc.broadcast(df.select(collect_list("scale")).collect()(0)(0).asInstanceOf[collection.mutable.WrappedArray[Int]])

import org.apache.spark.sql.functions._
def newScale = udf((scale: Int)=> collectedList.value.contains(scale))

df.withColumn("scale", when(newScale(col("scale")-1), col("scale")-1).otherwise(col("scale")))
  .show(false)

你应该有所需的输出

+---+-------------+-----+
|id |AccountNumber|scale|
+---+-------------+-----+
|1  |1500847      |6    |
|2  |1501199      |6    |
|3  |1119024      |3    |
+---+-------------+-----+

使用Window功能

我将建议的解决方案将要求您使用Window函数收集一个执行程序中的所有数据,以形成另一列scaleCheck,其将填充scale列中存在的所有比例

import org.apache.spark.sql.expressions.Window
def windowSpec = Window.orderBy("id").rowsBetween(Long.MinValue, Long.MaxValue)
val tempdf = df.withColumn("scaleCheck", collect_list("scale").over(windowSpec))

这会给你dataframe

+---+-------------+-----+----------+
|id |AccountNumber|scale|scaleCheck|
+---+-------------+-----+----------+
|1  |1500847      |6    |[6, 7, 3] |
|2  |1501199      |7    |[6, 7, 3] |
|3  |1119024      |3    |[6, 7, 3] |
+---+-------------+-----+----------+

然后你必须编写一个udf函数来检查行中的比例是否已经存在于收集的列表中。然后使用when函数并调用udf函数,您可以生成scale

import org.apache.spark.sql.functions._
def newScale = udf((scale: Int, scaleCheck: collection.mutable.WrappedArray[Int])=> scaleCheck.contains(scale))

tempdf.withColumn("scale", when(newScale(col("scale")-1, col("scaleCheck")), col("scale")-1).otherwise(col("scale")))
  .drop("scaleCheck")
  .show(false)

所以你的最终需要的dataframe已经实现,如上所述

我希望答案是有帮助的

© www.soinside.com 2019 - 2024. All rights reserved.