如何计算上一次满足条件之间的天数?

问题描述 投票:0回答:2

当前df:

df = spark.createDataFrame([
    ("2020-01-12","d1",0),
    ("2020-01-12","d2",0),
    ("2020-01-13","d3",0),
    ("2020-01-14","d4",1), 
    ("2020-01-15","d5",0),
    ("2020-01-15","d6",0),
    ("2020-01-16","d7",0),
    ("2020-01-17","d8",0),
    ("2020-01-18","d9",1),
    ("2020-01-19","d10",0),
    ("2020-01-20","d11",0),], 
    ['date', 'device', 'condition'])

df.show()

+----------+------+---------+
|      date|device|condition|
+----------+------+---------+
|2020-01-12|    d1|        0|
|2020-01-12|    d2|        0|
|2020-01-13|    d3|        0|
|2020-01-14|    d4|        1|
|2020-01-15|    d5|        0|
|2020-01-15|    d6|        0|
|2020-01-16|    d7|        0|
|2020-01-17|    d8|        0|
|2020-01-18|    d9|        1|
|2020-01-19|   d10|        0|
|2020-01-20|   d11|        0|
+----------+------+---------+

所需的输出df:

want_df = spark.createDataFrame([
    ("2020-01-12","d1",0,0),
    ("2020-01-12","d2",0,0),
    ("2020-01-13","d3",0,1),
    ("2020-01-14","d4",1,2), 
    ("2020-01-15","d5",0,1),
    ("2020-01-15","d6",0,1),
    ("2020-01-16","d7",0,2),
    ("2020-01-17","d8",0,3),
    ("2020-01-18","d9",1,4),
    ("2020-01-19","d10",0,1),
    ("2020-01-20","d11",0,2),], 
    ['date', 'device', 'condition', 'life'])

want_df.show()

+----------+------+---------+----+
|      date|device|condition|life|
+----------+------+---------+----+
|2020-01-12|    d1|        0|   0|
|2020-01-12|    d2|        0|   0|
|2020-01-13|    d3|        0|   1|
|2020-01-14|    d4|        1|   2|
|2020-01-15|    d5|        0|   1|
|2020-01-15|    d6|        0|   1|
|2020-01-16|    d7|        0|   2|
|2020-01-17|    d8|        0|   3|
|2020-01-18|    d9|        1|   4|
|2020-01-19|   d10|        0|   1|
|2020-01-20|   d11|        0|   2|
+----------+------+---------+----+

目的是计算直到condition=1为止的日期差(天数),然后将日期差重设为从满足最后一个条件开始的天数。 life是尝试计算的列。知道如何计算吗? Windowlag

apache-spark pyspark apache-spark-sql pyspark-sql
2个回答
1
投票

这是一种类型的问题,可以通过添加一些临时行来简化(我们将其标记,然后在以后将其删除)

from pyspark.sql import Window
from pyspark.sql.functions import lit, lag, sum as fsum, first, datediff

((1)首先,创建一个新的数据帧df1,该数据帧复制条件== 1的所有行,但将其条件设置为0且标志= 1,将结果数据帧与原始数据帧合并(设置标志= 0):

df1 = df.withColumn('flag', lit(0)).union(
    df.where('condition = 1').withColumn('condition', lit(0)).withColumn('flag', lit(1))
)

(2)然后,设置以下两个窗口规格,使用w1帮助创建一个子组标签g对所有连续的行进行分组,直到条件从1切换到0。将flag添加到orderBy (),以便新添加的行位于条件= 1的相应行的后面,并分组到下一个组标签中。]

w1 = Window.partitionBy(lit(0)).orderBy('date', 'flag')
w2 = Window.partitionBy(lit(0), 'g').orderBy('date', 'flag')

注意:

如果数据框很大,则可能需要将lit(0)更改为某些实际列或计算列,以避免Spark将所有行移动到单个分区上。 更新:根据注释,数据帧是单个时间序列,可以加载到单个分区上,因此使用lit(0)应该足够。

(3)使用w1上的滞后和求和函数找到子组标签'g',然后使用WindowSpec w2计算同一组中的first_date。此日期用于计算“寿命”列:

df2 = df1.withColumn('g', fsum((lag('condition').over(w1) == 1).astype('int')).over(w1)) \
    .withColumn('first_date', first('date').over(w2)) \
    .withColumn('life', datediff('date','first_date'))
df2.show()
+----------+------+---------+----+---+----------+----+
|      date|device|condition|flag|  g|first_date|life|
+----------+------+---------+----+---+----------+----+
|2020-01-12|    d1|        0|   0|  0|2020-01-12|   0|
|2020-01-12|    d2|        0|   0|  0|2020-01-12|   0|
|2020-01-13|    d3|        0|   0|  0|2020-01-12|   1|
|2020-01-14|    d4|        1|   0|  0|2020-01-12|   2|
|2020-01-14|    d4|        0|   1|  1|2020-01-14|   0|
|2020-01-15|    d5|        0|   0|  1|2020-01-14|   1|
|2020-01-15|    d6|        0|   0|  1|2020-01-14|   1|
|2020-01-16|    d7|        0|   0|  1|2020-01-14|   2|
|2020-01-17|    d8|        0|   0|  1|2020-01-14|   3|
|2020-01-18|    d9|        1|   0|  1|2020-01-14|   4|
|2020-01-18|    d9|        0|   1|  2|2020-01-18|   0|
|2020-01-19|   d10|        0|   0|  2|2020-01-18|   1|
|2020-01-20|   d11|        0|   0|  2|2020-01-18|   2|
+----------+------+---------+----+---+----------+----+

((4)删除临时行和列以获得最终数据框:

df_new = df2.filter('flag = 0').drop('first_date', 'g', 'flag')
df_new.show()
+----------+------+---------+----+
|      date|device|condition|life|
+----------+------+---------+----+
|2020-01-12|    d1|        0|   0|
|2020-01-12|    d2|        0|   0|
|2020-01-13|    d3|        0|   1|
|2020-01-14|    d4|        1|   2|
|2020-01-15|    d5|        0|   1|
|2020-01-15|    d6|        0|   1|
|2020-01-16|    d7|        0|   2|
|2020-01-17|    d8|        0|   3|
|2020-01-18|    d9|        1|   4|
|2020-01-19|   d10|        0|   1|
|2020-01-20|   d11|        0|   2|
+----------+------+---------+----+

0
投票

我尝试以不同的方式提供,它更接近标准sql方言,但仍使用pyspark语法并关注性能影响。

© www.soinside.com 2019 - 2024. All rights reserved.