假设我们有以下数据框:
port | flag | timestamp
---------------------------------------
20 | S | 2009-04-24T17:13:14+00:00
30 | R | 2009-04-24T17:14:14+00:00
32 | S | 2009-04-24T17:15:14+00:00
21 | R | 2009-04-24T17:16:14+00:00
54 | R | 2009-04-24T17:17:14+00:00
24 | R | 2009-04-24T17:18:14+00:00
我想计算3天在Pyspark中不同的port, flag
的数量。
结果将是类似:
port | flag | timestamp | distinct_port_flag_overs_3h
---------------------------------------
20 | S | 2009-04-24T17:13:14+00:00 | 1
30 | R | 2009-04-24T17:14:14+00:00 | 1
32 | S | 2009-04-24T17:15:14+00:00 | 2
21 | R | 2009-04-24T17:16:14+00:00 | 2
54 | R | 2009-04-24T17:17:14+00:00 | 2
24 | R | 2009-04-24T17:18:14+00:00 | 3
SQL请求看起来像:
SELECT
COUNT(DISTINCT port) OVER my_window AS distinct_port_flag_overs_3h
FROM my_table
WINDOW my_window AS (
PARTITION BY flag
ORDER BY CAST(timestamp AS timestamp)
RANGE BETWEEN INTERVAL 3 HOUR PRECEDING AND CURRENT
)
我发现this topic解决了这个问题,但前提是我们想在一个字段中计算不同的元素。
有人对如何实现这一目标有任何想法吗?
python 3.7
pyspark 2.4.4
只需收集结构集(port, flag)
并获取其大小。像这样的东西:
w = Window.partitionBy("flag").orderBy("timestamp").rangeBetween(-10800, Window.currentRow)
df.withColumn("timestamp", to_timestamp("timestamp").cast("long"))\
.withColumn("distinct_port_flag_overs_3h", size(collect_set(struct("port", "flag")).over(w)))\
.orderBy(col("timestamp"))\
.show()