如何迭代 PySpark 分组数据中的行

问题描述 投票:0回答:1

我的数据格式是这样的:

| id | location |type |date      | time |
| 1  | 33       |out  |2020-11-03| 08:35|
| 1  | 34       |in   |2020-11-03| 08:37|
| 1  | 33       |in   |2020-11-03| 09:40|
| 1  | 33       |out  |2020-11-03| 10:35|
| 1  | 33       |in   |2020-11-03| 12:40|
| 1  | 33       |out  |2020-11-03| 18:35|
| 2  | 33       |out  |2020-11-03| 11:35|
| 2  | 33       |in   |2020-11-03| 18:35|
| 2  | 33       |both |2020-11-03| 20:35|

逻辑如下:

  1. 按相同 ID、位置和日期对记录进行分组
  2. 循环遍历分组记录,找出第一个“in”或“both”记录以及对应的时间
  3. 循环遍历组中剩余的记录,找出下一个“out”或“both”记录以及对应的时间
  4. “两者”类型可以是“in”或“out”。如果“both”记录之前没有“in”记录,则将这条记录视为“in”,如果之前存在未配对的“in”,则将其视为“out”
  5. 收集配对记录,如果同一组中有2条配对进出,则收集为2行

如何在Pyspark中实现?

我编写了一个脚本,但计算需要很长时间。我想一定有更有效的方法。

我的代码:

df is my sample data above

_intype = (in,both)
_outtype = (out,both)

lst = df.select("id", "location", "date").distinct().collect()

in_lst = []
out_lst = []
lst_agg = []
pre_type = None
for record in lst:
    id_val = record.id
    location_val = record.location
    date_val = record.date
    
    records = df.filter((df.id == id_val) & (df.location == location_val) & (df.date == date_val)).collect()
    for row in records:
        if pre_type == None or pre_type == 'OUT':
            if row.inout_type in _intype:
                time_in = row.transaction_time
                time_out = None
                pre_type = 'IN'
        elif pre_type == 'IN':
            if row.inout_type in _outtype:
                time_out = row.transaction_time
                pre_type = 'OUT'
    if time_out is not None:
        lst_agg.append((id_val, location_val, date_val, time_in, time_out))
        time_in = None
        time_out = None

df_agg_cols = ["id","location","date","time_in","time_out"]
df_agg = spark.createDataFrame(data=lst_agg, schema = df_agg_cols)
````

I want to pair the records for same id, location and date like:
| id | location |date      | time_in |time_out|
| 1  | 33       |2020-11-03| 08:37   |10:35   |
| 1  | 33       |2020-11-03| 12:40   |18:35   |
| 2  | 33       |2020-11-03| 18:35   |20:35   |
pyspark
1个回答
0
投票

这是一个基于您的输入的快速示例,可获取所需的输出。但请注意,它不会执行您在问题中列出的循环。该解决方案是通过将数据集分为两部分(输入和输出),然后在正确的列上进行分组和连接来实现的。

如果这不能解决您的问题,请提供更多示例数据以及预期输出。

import pyspark.sql.functions as F

df = spark.sql("""
               SELECT 1 id, 33 location, "out" type, CAST("2020-11-03" as date) date, "08:35" time UNION ALL 
               SELECT 1 id, 33 location, "in" type, CAST("2020-11-03" as date) date, "08:37" time UNION ALL 
               SELECT 1 id, 33 location, "in" type, CAST("2020-11-03" as date) date, "09:40" time UNION ALL 
               SELECT 1 id, 33 location, "out" type, CAST("2020-11-03" as date) date, "10:35" time UNION ALL 
               SELECT 2 id, 33 location, "out" type, CAST("2020-11-03" as date) date, "11:35" time UNION ALL 
               SELECT 2 id, 33 location, "in" type, CAST("2020-11-03" as date) date, "18:35" time UNION ALL 
               SELECT 2 id, 33 location, "both" type, CAST("2020-11-03" as date) date, "20:35" time
               """)

df_in = (df
         .filter("type in ('in', 'both')")
         .groupBy("id","location","date")
         .agg(F.min("time").alias("time_in"))
         )

df_out = (df
         .filter("type in ('out', 'both')")
         .groupBy("id","location","date")
         .agg(F.max("time").alias("time_out"))
         )

df_final = (df_in
            .join(df_out, on=["id","location","date"])
            )

df_final.show()

输出:

+---+--------+----------+-------+--------+
| id|location|      date|time_in|time_out|
+---+--------+----------+-------+--------+
|  1|      33|2020-11-03|  08:37|   10:35|
|  2|      33|2020-11-03|  18:35|   20:35|
+---+--------+----------+-------+--------+
© www.soinside.com 2019 - 2024. All rights reserved.