Spark Dataframe中列的最大两个下一个值的平均值

问题描述 投票:-1回答:2

下面是示例数据框:

id  nd  time    value   
3   n1  7       50  
10  n1  3       40  
11  n1  5       30  
1   n1  2       20  
2   n1  6       20  
9   n1  4       10  
4   n1  1       10

[最大时间为7,我必须找到time小于7的两个最大值之和:最大值为40和30,然后计算newValue = value - avg(30,40) = 50 - (30 + 40)/2 = 15

现在下一个最大值time为6。因此,我必须找到两个小于6的最大值。(再次为30和40,所以newValue = 20 - avg(30,40) = -15]

类似地,我必须查找所有值,直到最后两个必须为空的值。

id  nd  time    value    NewVal
3   n1  7       50       15
10  n1  3       40       25
11  n1  5       30       0 ((40+20)/2)(30-30)
1   n1  2       20       Null
2   n1  6       20      -15
9   n1  4       10       20
4   n1  1       10       Null
sql apache-spark pyspark pyspark-sql pyspark-dataframes
2个回答
1
投票

如果可以对数据进行适当的分区,例如,使用示例中的nd列,我将使用Window函数:(或者,如果可以将数据加载到一个分区中,在这种情况下,请从下面删除partitionBy('nd') WindowSpec w1

from pyspark.sql.functions import sort_array, collect_list, expr
from pyspark.sql import Window 

w1 = Window.partitionBy('nd').orderBy('time').rowsBetween(Window.unboundedPreceding, -1) 

df.withColumn('v1', sort_array(collect_list('value').over(w1),False)) \
    .withColumn('NewVal', expr('value - (v1[0] + v1[1])*0.5')) \
    .show(10, False)                                                                        
+---+---+----+-----+------------------------+------+                            
|id |nd |time|value|v1                      |NewVal|
+---+---+----+-----+------------------------+------+
|4  |n1 |1   |10   |[]                      |null  |
|1  |n1 |2   |20   |[10]                    |null  |
|10 |n1 |3   |40   |[20, 10]                |25.0  |
|9  |n1 |4   |10   |[40, 20, 10]            |-20.0 |
|11 |n1 |5   |30   |[40, 20, 10, 10]        |0.0   |
|2  |n1 |6   |20   |[40, 30, 20, 10, 10]    |-15.0 |
|3  |n1 |7   |50   |[40, 30, 20, 20, 10, 10]|15.0  |
+---+---+----+-----+------------------------+------+

UPDATE:计算任何N个max的平均值:

from pyspark.sql.functions import sort_array, collect_list, col, round                                      

N = 3

df.withColumn('v1', sort_array(collect_list('value').over(w1),False)) \
    .withColumn('NewVal', round(col('value') - sum(col('v1')[i] for i in range(N))/N,2)) \
    .show(10, False)

0
投票

我写了UDF,可以解决您的问题。根据您的逻辑,时间4的NewValue将为-20,而不是20。在我的代码中哪个是正确的。请确认相同。

>>> from pyspark.sql.types import StringType
>>> from pyspark.sql.functions import udf,col,concat_ws,collect_list
>>> from pyspark.sql.window import Window
>>> df.show()
+---+---+----+-----+
| id| nd|time|value|
+---+---+----+-----+
|  3| n1|   7|   50|
| 10| n1|   3|   40|
| 11| n1|   5|   30|
|  1| n1|   2|   20|
|  2| n1|   6|   20|
|  9| n1|   4|   10|
|  4| n1|   1|   10|
+---+---+----+-----+

>>> df.cache()
>>> cnt = df.count()
>>> def sampleFun(allvalue,value):
...     output = ''
...     firstValue = allvalue.replace(value + ',','', 1)
...     firstList =  [int(i) for i in firstValue.split(',')]
...     if len(firstList) > 1:
...             max_1 = max(firstList)
...             secondValue = firstValue.replace(str(max_1) + ',','', 1)
...             secondList = [int(i) for i in secondValue.split(",")]
...             max_2 = max(secondList)
...             avgValue = (max_1 + max_2)/2
...             output = (int(value) - avgValue)
...             return str(output)
...     else:
...             return ''

>>> sampleUDF = udf(sampleFun, StringType())
>>> W = Window.rowsBetween(0,cnt).orderBy(col("time").desc())
>>> df1 = df.withColumn("ListValue", concat_ws(",",collect_list(col("value")).over(W)))
>>> df2 = df1.withColumn("NewValue", sampleUDF(col("ListValue"), col("value"))).drop("ListValue")
>>> df2.show()
+---+---+----+-----+--------+                                                   
| id| nd|time|value|NewValue|
+---+---+----+-----+--------+
|  3| n1|   7|   50|    15.0|
|  2| n1|   6|   20|   -15.0|
| 11| n1|   5|   30|     0.0|
|  9| n1|   4|   10|   -20.0|
| 10| n1|   3|   40|    25.0|
|  1| n1|   2|   20|        |
|  4| n1|   1|   10|        |
+---+---+----+-----+--------+
© www.soinside.com 2019 - 2024. All rights reserved.