如何检测pyspark数据框列中的模式何时发生变化

问题描述 投票:1回答:1

我有如下数据框:

|DateTime                  |UID                   |result    

|2020-02-23 11:42:34       |0000111D              |30         
|2020-02-24 11:47:34       |0000111D              |30         
|2020-02-24 11:48:34       |0000111D              |29         
|2020-02-24 11:49:34       |0000111D              |29         
|2020-02-24 11:50:34       |0000111D              |28         
|2020-02-25 11:52:34       |0000111D              |28         
|2020-02-25 11:12:35       |0000111D              |27         
|2020-02-26 11:34:35       |0000111D              |27         
|2020-02-27 11:12:35       |0000111D              |2A         
|2020-02-28 11:43:35       |0000111D              |2A         
|2020-03-01 11:23:35       |0000111D              |30         
|2020-03-02 11:10:35       |0000111D              |30         
|2020-03-03 11:07:35       |0000111D              |29         
|2020-03-04 11:31:35       |0000111D              |29         
|2020-03-05 11:07:35       |0000111D              |28         
|2020-03-06 11:17:35       |0000111D              |28         
|2020-03-07 11:15:47       |0000111D              |27         
|2020-03-08 11:34:09       |0000111D              |27         
|2020-03-09 11:15:47       |0000111D              |26         
|2020-03-10 04:12:45       |0000111D              |26         
|2020-03-11 11:15:47       |0000111D              |2A         
|2020-03-12 07:34:09       |0000111D              |2A        

我想要在每个周期中更改结果值时获取'DateTime'值。因此,基本上每个UID的周期是30到2A。现在,在某些情况下,像第一个循环一样,这里出现[[可能会丢失数据]],结果'26'没有数据,并且每次更改都必须取最后一次出现,除了第一个记录每个周期基于此,我想要这样的输出:

UID start_point 1st_change 2nd_change 3rd change 4th_change 5th change 0000111D 2020-02-23 11:42:34 2020-02-24 11:49:34 2020-02-25 11:52:34 2020-02-26 11:34:35 datamiss 2020-02-28 11:43:35 0000111D 2020-03-01 11:23:35 2020-03-04 11:31:35 2020-03-06 11:17:35 2020-03-08 11:34:09 2020-03-10 04:12:45 2020-03-12 07:34:09

考虑到我必须对每个传感器ID多次执行并且数据集具有1000k记录的情况,我如何以最有效的方式执行此操作。

到目前为止,我已经做到了这一点,但未能做到正确,无法处理缺少数据时的动态性

w = Window.orderBy("DateTime") df_temp1=df.withColumn("rn",row_number().over(w)).\ withColumn("lead",lead(col("result"),1).over(w)).\ withColumn("lag",lag(col("result"),1).over(w)).withColumn("mismatch_bool",when((col('lead') != col('lag')),lit("true")).otherwise(lit("False")))

基于此,我想要这样的输出:

sensorid start_point 1st_change 2nd_change 3rd chnage 4th_change 5th chnage 0000126D 2020-02-23 11:42:34 2020-02-24 11:49:34 2020-02-25 11:52:34 2020-02-26 11:34:35 2020-02-28 11:43:35 null 0000126D 2020-03-01 11:23:35 2020-03-04 11:31:35 2020-03-06 11:17:35 2020-03-08 09:34:09 2020-03-10 11:34:09 2020-03-08 07:34:09

考虑到我必须对每个传感器ID多次执行并且数据集具有1000k记录的情况,我如何以最有效的方式执行此操作。

到目前为止,我能够做到这一点。

w = Window.orderBy("DateTime") df_temp1=df_records_indiv_sensor.withColumn("rn",row_number().over(w)).\ withColumn("lead",lead(col("result"),1).over(w)).\ withColumn("lag",lag(col("result"),1).over(w)).withColumn("mismatch_bool",when((col('lead') != col('lag')),lit("true")).otherwise(lit("False")))

我有一个如下数据框:| DateTime | UID |结果| 2020-02-23 11:42:34 | 0000111D | 30 | 2020-02-24 11:47:34 | 0000111D ...
python dataframe pyspark pyspark-sql
1个回答
0
投票

Spark2.4 only.

© www.soinside.com 2019 - 2024. All rights reserved.