我有一个包含两列的 pyspark 数据框。到达和离开。这个想法是计算落在根据到达时间计算的指定窗口内的出发事件的数量。 例如,如果一个物品于 23:00 到达,那么我想采用 -12 小时 [11:00, 23:00] 的窗口并计算在该时间间隔内留下的物品数量。
这是我创建它的代码。但如您所见,它不起作用,因为我可以在arrival_time列或dept_time列上进行小时聚合。
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("rolling_window_example").getOrCreate()
# Sample data
data = [
("2024-05-10 02:00:00", "2024-05-10 21:30:00", 0, 1),
("2024-05-12 14:10:00", "2024-05-13 02:00:00", 1, 1),
("2024-05-05 03:00:00", "2024-05-14 03:30:00", 2, 2),
("2024-05-14 01:32:00", "2024-05-14 23:30:00", 0, 2),
("2024-05-14 01:00:00", "2024-05-15 01:30:00", 0, 1)
]
columns = ["dept_time", "arrival_time", "ground_truth_12", "ground_truth_24"]
# Create DataFrame
df = spark.createDataFrame(data, columns)
df = df.withColumn("dept_timestamp", col("dept_time").cast("timestamp"))
df = df.withColumn("arrival_timestamp", col("arrival_time").cast("timestamp"))
df = df.withColumn("dept_time", col("dept_time").cast("timestamp").cast("long"))
df = df.withColumn("arrival_time", col("arrival_time").cast("timestamp").cast("long"))
# Calculate windows wrt arrival time
window_12 = Window.partitionBy().orderBy("arrival_time").rangeBetween(-12 * 3600, Window.currentRow)
window_24 = Window.partitionBy().orderBy("arrival_time").rangeBetween(-24 * 3600, Window.currentRow)
df_rolling = df \
.orderBy("dept_time") \
.withColumn("t_12_count", f.count("dept_time").over(window_12)) \
.withColumn("t_24_count", f.count("dept_time").over(window_24))
# Show results
display(df_rolling)
输出不符合预期:
+----------+------------+---------------+---------------+-------------------+-------------------+----------+----------+
| dept_time|arrival_time|ground_truth_12|ground_truth_24| dept_timestamp| arrival_timestamp|t_12_count|t_24_count|
+----------+------------+---------------+---------------+-------------------+-------------------+----------+----------+
|1715306400| 1715376600| 0| 1|2024-05-10 02:00:00|2024-05-10 21:30:00| 1| 1|
|1715523000| 1715565600| 1| 1|2024-05-12 14:10:00|2024-05-13 02:00:00| 1| 1|
|1714878000| 1715657400| 2| 2|2024-05-05 03:00:00|2024-05-14 03:30:00| 1| 1|
|1715650320| 1715729400| 0| 2|2024-05-14 01:32:00|2024-05-14 23:30:00| 1| 2|
|1715648400| 1715736600| 0| 1|2024-05-14 01:00:00|2024-05-15 01:30:00| 2| 3|
+----------+------------+---------------+---------------+-------------------+-------------------+----------+----------+
预期输出可以在列中看到:
ground_truth_12
和ground_truth_24
分别表示 12 小时和 24 小时窗口。
这是一种巧妙的方法,可以计算出当前行到达之前的所有出发情况。
用到达标志标记相应的时间,即
"A"
或出发 "D"
现在合并这两个数据框。
按时间对这些数据帧进行排序,而不考虑标签。
创建一个窗口规范,它将计算 -12 小时到 0 秒(当前时间)窗口内出现的所有
"D"
行。
-12 小时窗口也是如此。
重要提示:
以下是一个工作示例。
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
spark = SparkSession.builder.appName("SelfJoinExample").getOrCreate()
## There is error in the second row ground truth which I have corrected below.
data = [
("2024-05-10 02:00:00", "2024-05-10 21:30:00", 0, 1),
("2024-05-12 12:00:00", "2024-05-13 02:00:00", 0, 1),
("2024-05-05 03:00:00", "2024-05-14 03:30:00", 2, 2),
("2024-05-14 01:32:00", "2024-05-14 23:30:00", 0, 2),
("2024-05-14 01:00:00", "2024-05-15 01:30:00", 0, 1)
]
columns = ["departure_time", "arrival_time", "ground_truth_12", "ground_truth_24"]
# Create DataFrame
df = spark.createDataFrame(data, columns)
df = df.withColumn("departure_timestamp", col("departure_time").cast("timestamp")).drop("departure_time")
df = df.withColumn("arrival_timestamp", col("arrival_time").cast("timestamp")).drop("arrival_time")
df = df.withColumn("mono_id", monotonically_increasing_id())
df = df.withColumn("arrival_label", array(col("arrival_timestamp"), lit("A"), col("mono_id")))
df = df.withColumn("dept_label", array(col("departure_timestamp"), lit("D"), col("mono_id")))
df_arrival = df.select(col("arrival_label").alias("common_name"))
df_dept = df.select(col("dept_label").alias("common_name"))
df_union = df_arrival.union(df_dept)
df_union = df_union.orderBy(col("common_name")[0])
df_union.show(n=1000, truncate=False)
df_mid = df_union.withColumn("timestamp", to_timestamp(df_union["common_name"][0]))
df_mid = df_mid.withColumn("long_ts", col("timestamp").cast("long"))
df_mid = df_mid.withColumn("type", df_union["common_name"][1])
df_mid = df_mid.withColumn("value", df_union["common_name"][2].cast(LongType()))
df_mid = df_mid.drop("common_name")
df_mid = df_mid.withColumn("dept_flag", when(col("type") == "D", 1).otherwise(0))
df_mid.show(truncate=False)
windowSpec12 = Window.orderBy("long_ts").rangeBetween(-12 * 3600, 0)
windowSpec24 = Window.orderBy("long_ts").rangeBetween(-24 * 3600, 0)
df_int = df_mid.withColumn("calc_t12", sum("dept_flag").over(windowSpec12))
df_int = df_int.withColumn("calc_t24", sum("dept_flag").over(windowSpec24))
df_int.show(n=1000, truncate=False)
df_crosscheck = df.join(df_int, on=[col("type") == "A", col("mono_id") == col("value")], how="inner")
print("Final Result")
df_crosscheck.select("ground_truth_12", "ground_truth_24", "calc_t12", "calc_t24").show(n=1000, truncate=False)
最终交叉检查数据框:
+---------------+---------------+--------+--------+
|ground_truth_12|ground_truth_24|calc_t12|calc_t24|
+---------------+---------------+--------+--------+
|0 |1 |0 |1 |
|0 |1 |0 |1 |
|2 |2 |2 |2 |
|0 |2 |0 |2 |
|0 |1 |0 |1 |
+---------------+---------------+--------+--------+
完整输出如下:
+--------------------------------------+
|common_name |
+--------------------------------------+
|[2024-05-05 03:00:00, D, 94489280512] |
|[2024-05-10 02:00:00, D, 25769803776] |
|[2024-05-10 21:30:00, A, 25769803776] |
|[2024-05-12 12:00:00, D, 60129542144] |
|[2024-05-13 02:00:00, A, 60129542144] |
|[2024-05-14 01:00:00, D, 163208757248]|
|[2024-05-14 01:32:00, D, 128849018880]|
|[2024-05-14 03:30:00, A, 94489280512] |
|[2024-05-14 23:30:00, A, 128849018880]|
|[2024-05-15 01:30:00, A, 163208757248]|
+--------------------------------------+
+-------------------+----------+----+------------+---------+
|timestamp |long_ts |type|value |dept_flag|
+-------------------+----------+----+------------+---------+
|2024-05-05 03:00:00|1714858200|D |94489280512 |1 |
|2024-05-10 02:00:00|1715286600|D |25769803776 |1 |
|2024-05-10 21:30:00|1715356800|A |25769803776 |0 |
|2024-05-12 12:00:00|1715495400|D |60129542144 |1 |
|2024-05-13 02:00:00|1715545800|A |60129542144 |0 |
|2024-05-14 01:00:00|1715628600|D |163208757248|1 |
|2024-05-14 01:32:00|1715630520|D |128849018880|1 |
|2024-05-14 03:30:00|1715637600|A |94489280512 |0 |
|2024-05-14 23:30:00|1715709600|A |128849018880|0 |
|2024-05-15 01:30:00|1715716800|A |163208757248|0 |
+-------------------+----------+----+------------+---------+
+-------------------+----------+----+------------+---------+--------+--------+
|timestamp |long_ts |type|value |dept_flag|calc_t12|calc_t24|
+-------------------+----------+----+------------+---------+--------+--------+
|2024-05-05 03:00:00|1714858200|D |94489280512 |1 |1 |1 |
|2024-05-10 02:00:00|1715286600|D |25769803776 |1 |1 |1 |
|2024-05-10 21:30:00|1715356800|A |25769803776 |0 |0 |1 |
|2024-05-12 12:00:00|1715495400|D |60129542144 |1 |1 |1 |
|2024-05-13 02:00:00|1715545800|A |60129542144 |0 |0 |1 |
|2024-05-14 01:00:00|1715628600|D |163208757248|1 |1 |1 |
|2024-05-14 01:32:00|1715630520|D |128849018880|1 |2 |2 |
|2024-05-14 03:30:00|1715637600|A |94489280512 |0 |2 |2 |
|2024-05-14 23:30:00|1715709600|A |128849018880|0 |0 |2 |
|2024-05-15 01:30:00|1715716800|A |163208757248|0 |0 |1 |
+-------------------+----------+----+------------+---------+--------+--------+
+---------------+---------------+--------+--------+
|ground_truth_12|ground_truth_24|calc_t12|calc_t24|
+---------------+---------------+--------+--------+
|0 |1 |0 |1 |
|0 |1 |0 |1 |
|2 |2 |2 |2 |
|0 |2 |0 |2 |
|0 |1 |0 |1 |
+---------------+---------------+--------+--------+