我有一个像下面这样的DataFrame
。
+---+-------------+-----+
| id|AccountNumber|scale|
+---+-------------+-----+
| 1| 1500847| 6|
| 2| 1501199| 7|
| 3| 1119024| 3|
+---+-------------+-----+
我必须填充第二个DataFrame
,它最初是空的,如下所示。
id AccountNumber scale
1 1500847 6
2 1501199 6
3 1119024 3
第一个DataFrame
中的第一行的scale
为6.检查结果中该值减去1(所以scale
等于5)。没有,所以只需将行(1,1500847,6)
添加到输出中。
输出中的第二行有一个scale
为7.原始表已经有一行scale
7 - 1,所以添加这一行,但使用该比例(2, 15001199, 6)
。
第三行作为第一行。
使用广播列表
您可以将scale
列中的所有比例收集为数组,并将broadcast
收集到udf
函数中。然后使用udf
逻辑中的when
函数和withColumn
作为
import org.apache.spark.sql.functions._
val collectedList = sc.broadcast(df.select(collect_list("scale")).collect()(0)(0).asInstanceOf[collection.mutable.WrappedArray[Int]])
import org.apache.spark.sql.functions._
def newScale = udf((scale: Int)=> collectedList.value.contains(scale))
df.withColumn("scale", when(newScale(col("scale")-1), col("scale")-1).otherwise(col("scale")))
.show(false)
你应该有所需的输出
+---+-------------+-----+
|id |AccountNumber|scale|
+---+-------------+-----+
|1 |1500847 |6 |
|2 |1501199 |6 |
|3 |1119024 |3 |
+---+-------------+-----+
使用Window功能
我将建议的解决方案将要求您使用Window
函数收集一个执行程序中的所有数据,以形成另一列scaleCheck
,其将填充scale
列中存在的所有比例
import org.apache.spark.sql.expressions.Window
def windowSpec = Window.orderBy("id").rowsBetween(Long.MinValue, Long.MaxValue)
val tempdf = df.withColumn("scaleCheck", collect_list("scale").over(windowSpec))
这会给你dataframe
+---+-------------+-----+----------+
|id |AccountNumber|scale|scaleCheck|
+---+-------------+-----+----------+
|1 |1500847 |6 |[6, 7, 3] |
|2 |1501199 |7 |[6, 7, 3] |
|3 |1119024 |3 |[6, 7, 3] |
+---+-------------+-----+----------+
然后你必须编写一个udf
函数来检查行中的比例是否已经存在于收集的列表中。然后使用when
函数并调用udf
函数,您可以生成scale
值
import org.apache.spark.sql.functions._
def newScale = udf((scale: Int, scaleCheck: collection.mutable.WrappedArray[Int])=> scaleCheck.contains(scale))
tempdf.withColumn("scale", when(newScale(col("scale")-1, col("scaleCheck")), col("scale")-1).otherwise(col("scale")))
.drop("scaleCheck")
.show(false)
所以你的最终需要的dataframe
已经实现,如上所述
我希望答案是有帮助的