如何扫描列以在Pyspark DataFrame中获取新列

问题描述 投票:0回答:1

我有一个Pyspark DataFrame有两列:sendtime和charge_state,如果charge_state从“off”变为“on”,则新的充电周期开始。

现在我想标记每个充电周期以给我输出。

输入:

+-------------------+------------+
|           sendtime|charge_state|
+-------------------+------------+
|2018-03-02 08:00:00|          on|
...
|2018-03-02 09:42:32|          on|
|2018-03-02 09:42:33|          on|
|2018-03-02 09:42:34|          on|
|2018-03-02 09:42:35|         off|
|2018-03-02 09:42:36|         off|
...
|2018-03-02 10:11:12|         off|
|2018-03-02 10:11:13|          on|
|2018-03-02 10:11:14|          on|
...

输出:

+-------------------+------------+---------------+
|           sendtime|charge_state|charge_cycle_ID|
+-------------------+------------+---------------+
|2018-03-02 08:00:00|          on|             c1|
...
|2018-03-02 09:42:32|          on|             c1|
|2018-03-02 09:42:33|          on|             c1|
|2018-03-02 09:42:34|          on|             c1|
|2018-03-02 09:42:35|         off|             c1|
|2018-03-02 09:42:36|         off|             c1|
...
|2018-03-02 10:11:12|         off|             c1|
|2018-03-02 10:11:13|          on|             c2|
|2018-03-02 10:11:14|          on|             c2|
...
apache-spark pyspark
1个回答
0
投票

您可以使用Window函数执行此任务:

from pyspark.sql import functions as F
from pyspark.sql import Window

df.withColumn(
    'charge_state_lag', 
    F.lag('charge_state').over(Window.partitionBy().orderBy('sendtime'))
).withColumn(
    'fg', 
    F.when((F.col("charge_state")=="on")&(F.col("charge_state_lag")=="off"),1).otherwise(0)
).select(
    'sendtime',
    'charge_state',
    F.concat(
        F.lit('C'),
        (F.sum('fg').over(Window.partitionBy().orderBy('sendtime'))+1).cast('string')
    ).alias("charge_cycle_ID")
).show()
© www.soinside.com 2019 - 2024. All rights reserved.