我的数据格式是这样的:
| id | location |type |date | time |
| 1 | 33 |out |2020-11-03| 08:35|
| 1 | 34 |in |2020-11-03| 08:37|
| 1 | 33 |in |2020-11-03| 09:40|
| 1 | 33 |out |2020-11-03| 10:35|
| 1 | 33 |in |2020-11-03| 12:40|
| 1 | 33 |out |2020-11-03| 18:35|
| 2 | 33 |out |2020-11-03| 11:35|
| 2 | 33 |in |2020-11-03| 18:35|
| 2 | 33 |both |2020-11-03| 20:35|
逻辑如下:
如何在Pyspark中实现?
我编写了一个脚本,但计算需要很长时间。我想一定有更有效的方法。
我的代码:
df is my sample data above
_intype = (in,both)
_outtype = (out,both)
lst = df.select("id", "location", "date").distinct().collect()
in_lst = []
out_lst = []
lst_agg = []
pre_type = None
for record in lst:
id_val = record.id
location_val = record.location
date_val = record.date
records = df.filter((df.id == id_val) & (df.location == location_val) & (df.date == date_val)).collect()
for row in records:
if pre_type == None or pre_type == 'OUT':
if row.inout_type in _intype:
time_in = row.transaction_time
time_out = None
pre_type = 'IN'
elif pre_type == 'IN':
if row.inout_type in _outtype:
time_out = row.transaction_time
pre_type = 'OUT'
if time_out is not None:
lst_agg.append((id_val, location_val, date_val, time_in, time_out))
time_in = None
time_out = None
df_agg_cols = ["id","location","date","time_in","time_out"]
df_agg = spark.createDataFrame(data=lst_agg, schema = df_agg_cols)
````
I want to pair the records for same id, location and date like:
| id | location |date | time_in |time_out|
| 1 | 33 |2020-11-03| 08:37 |10:35 |
| 1 | 33 |2020-11-03| 12:40 |18:35 |
| 2 | 33 |2020-11-03| 18:35 |20:35 |
这是一个基于您的输入的快速示例,可获取所需的输出。但请注意,它不会执行您在问题中列出的循环。该解决方案是通过将数据集分为两部分(输入和输出),然后在正确的列上进行分组和连接来实现的。
如果这不能解决您的问题,请提供更多示例数据以及预期输出。
import pyspark.sql.functions as F
df = spark.sql("""
SELECT 1 id, 33 location, "out" type, CAST("2020-11-03" as date) date, "08:35" time UNION ALL
SELECT 1 id, 33 location, "in" type, CAST("2020-11-03" as date) date, "08:37" time UNION ALL
SELECT 1 id, 33 location, "in" type, CAST("2020-11-03" as date) date, "09:40" time UNION ALL
SELECT 1 id, 33 location, "out" type, CAST("2020-11-03" as date) date, "10:35" time UNION ALL
SELECT 2 id, 33 location, "out" type, CAST("2020-11-03" as date) date, "11:35" time UNION ALL
SELECT 2 id, 33 location, "in" type, CAST("2020-11-03" as date) date, "18:35" time UNION ALL
SELECT 2 id, 33 location, "both" type, CAST("2020-11-03" as date) date, "20:35" time
""")
df_in = (df
.filter("type in ('in', 'both')")
.groupBy("id","location","date")
.agg(F.min("time").alias("time_in"))
)
df_out = (df
.filter("type in ('out', 'both')")
.groupBy("id","location","date")
.agg(F.max("time").alias("time_out"))
)
df_final = (df_in
.join(df_out, on=["id","location","date"])
)
df_final.show()
输出:
+---+--------+----------+-------+--------+
| id|location| date|time_in|time_out|
+---+--------+----------+-------+--------+
| 1| 33|2020-11-03| 08:37| 10:35|
| 2| 33|2020-11-03| 18:35| 20:35|
+---+--------+----------+-------+--------+