pyspark计算当前时间与上次活动时间之差的移动平均值

问题描述 投票:0回答:2

我有这样的记录。

A    B
1    2018-12-25
2    2019-01-15
1    2019-01-20
3    2018-01-01
2    2019-01-01
4    2018-04-09
3    2018-11-08
1    2018-03-20

我想得到的是这样的。第一步,按升序排列组内的顺序。 (不需要按A订购)

A    B
1    2018-03-20
1    2018-12-25
1    2019-01-20
3    2018-01-01
3    2018-11-08
2    2019-01-01
2    2019-01-15
4    2018-04-09

第二步,获取组内连续行之间的时间差。

A    B            C
1    2018-03-20   NaN
1    2018-12-25   280
1    2019-01-20   26
3    2018-01-01   NaN
3    2018-11-08   311
2    2019-01-01   NaN
2    2019-01-15   14
4    2018-04-09   NaN

第三步,获得窗口大小为2的C的移动平均值。(因为我只提供了很少的行作为示例,为方便起见,只选择大小2)

A    B            C     moving_avg
1    2018-03-20   NaN   NaN
1    2018-12-25   280   280
1    2019-01-20   26    153
3    2018-01-01   NaN   NaN
3    2018-11-08   311   311
2    2019-01-01   NaN   NaN
2    2019-01-15   14    14
4    2018-04-09   NaN   NaN

如果Windows函数可以处理这种情况,解决方案实际上不需要生成C列。我列出了每一步,以确保您能够清楚地了解问题所在。

结果集将如下所示

A    B            moving_avg
1    2018-03-20   NaN
1    2018-12-25   280
1    2019-01-20   153
3    2018-01-01   NaN
3    2018-11-08   311
2    2019-01-01   NaN
2    2019-01-15   14
4    2018-04-09   NaN

注意:这是在pyspark并使用数据帧。不是使用Pandas的Python。

非常感谢!

python pyspark moving-average
2个回答
0
投票

文档:windows

文档:lag

# Creating a Dataframe
from pyspark.sql.window import Window
from pyspark.sql.functions import col, to_date, lag, datediff, when, udf
df = sqlContext.createDataFrame([(1,'2018-12-25'),(2,'2019-01-15'),(1,'2019-01-20'),(3,'2018-01-01'),
                                 (2,'2019-01-01'),(4,'2018-04-09'),(3,'2018-11-08'),(1,'2018-03-20')],
                                 ['A','B'])
df = df.withColumn('B',to_date(col('B'), 'yyyy-MM-dd'))

# Using window and lag functions to find the value from previous row
my_window = Window.partitionBy('A').orderBy('A','B')

# Creating a UDF to calculate average of window sized 2.
def row_avg(c1,c2):
    count_non_null = 2
    total = 0
    if c1 == None:
        c1 = 0
        count_non_null = count_non_null - 1
    if c2 == None:
        c2 = 0
        count_non_null = count_non_null - 1
    if count_non_null == 0:
        return None
    else:
        return int((c1+c2)/count_non_null)

row_avg = udf(row_avg)

df = df.withColumn('B_Lag_1', lag(col('B'),1).over(my_window))\
       .withColumn('C', datediff(col('B'),col('B_Lag_1'))).drop('B_Lag_1')\
       .withColumn('C_Lag_1', lag(col('C'),1).over(my_window))\
       .withColumn('moving_avg',row_avg(col('C'),col('C_Lag_1'))).drop('C','C_Lag_1')
df.show()
+---+----------+----------+
|  A|         B|moving_avg|
+---+----------+----------+
|  1|2018-03-20|      null|
|  1|2018-12-25|       280|
|  1|2019-01-20|       153|
|  3|2018-01-01|      null|
|  3|2018-11-08|       311|
|  2|2019-01-01|      null|
|  2|2019-01-15|        14|
|  4|2018-04-09|      null|
+---+----------+----------+

0
投票

可能有更聪明的方法来实现这一点,但您也可以使用RDD:

from operator import add
from numpy import mean
from datetime import datetime

data = [(1, "2018-12-25"), (2, "2019-01-15"), (1, "2019-01-20"), (3, "2018-01-01"),
        (2, "2019-01-01"), (4, "2018-04-09"), (3, "2018-11-08"), (1, "2018-03-20")]
data = sc.parallelize(data).mapValues(lambda v: [datetime.strptime(v, "%Y-%m-%d")]).reduceByKey(add)

def computeMvgAvg(values):
sorted_date = sorted(values)
diffs = []
mvg_avg = []
for i in range(1, len(sorted_date)):
    diffs.append(int((sorted_date[i] - sorted_date[i-1]).total_seconds()/86400))
    mvg_avg.append(int(mean(diffs)))
diffs = [None] + diffs
mvg_avg = [None] + mvg_avg
return zip(sorted_date, diffs, mvg_avg)

sch = StructType([
   StructField("A", StringType(), True),
   StructField("B", DateType(), True),
   StructField("C", IntegerType(), True),
   StructField("moving_avg", IntegerType(), True)
])
data.flatMapValues(myMapValues).map(lambda row: [row[0]] + list(row[1])).toDF(schema=sch).show()

+---+----------+----+----------+
|  A|         B|   C|moving_avg|
+---+----------+----+----------+
|  1|2018-03-20|null|      null|
|  1|2018-12-25| 280|       280|
|  1|2019-01-20|  26|       153|
|  2|2019-01-01|null|      null|
|  2|2019-01-15|  14|        14|
|  3|2018-01-01|null|      null|
|  3|2018-11-08| 311|       311|
|  4|2018-04-09|null|      null|
+---+----------+----+----------+
© www.soinside.com 2019 - 2024. All rights reserved.