如何在PySpark中计算多个列和滚动窗口中的不同元素

问题描述 投票:1回答:1

假设我们有以下数据框:

port | flag | timestamp

---------------------------------------

20  | S    | 2009-04-24T17:13:14+00:00

30  | R    | 2009-04-24T17:14:14+00:00

32  | S    | 2009-04-24T17:15:14+00:00

21  | R    | 2009-04-24T17:16:14+00:00

54  | R    | 2009-04-24T17:17:14+00:00

24  | R    | 2009-04-24T17:18:14+00:00

我想计算3天在Pyspark中不同的port, flag的数量。

结果将是类似:

port | flag | timestamp | distinct_port_flag_overs_3h

---------------------------------------

20   | S    | 2009-04-24T17:13:14+00:00 | 1

30   | R    | 2009-04-24T17:14:14+00:00 | 1

32   | S    | 2009-04-24T17:15:14+00:00 | 2

21   | R    | 2009-04-24T17:16:14+00:00 | 2

54   | R    | 2009-04-24T17:17:14+00:00 | 2

24   | R    | 2009-04-24T17:18:14+00:00 | 3

SQL请求看起来像:

SELECT     
COUNT(DISTINCT port) OVER my_window AS distinct_port_flag_overs_3h
FROM my_table
WINDOW my_window AS (
    PARTITION BY flag
    ORDER BY CAST(timestamp AS timestamp)
    RANGE BETWEEN INTERVAL 3 HOUR PRECEDING AND CURRENT
)

我发现this topic解决了这个问题,但前提是我们想在一个字段中计算不同的元素。

有人对如何实现这一目标有任何想法吗?

  • python 3.7

  • pyspark 2.4.4

pyspark pyspark-sql pyspark-dataframes
1个回答
1
投票

只需收集结构集(port, flag)并获取其大小。像这样的东西:

w = Window.partitionBy("flag").orderBy("timestamp").rangeBetween(-10800, Window.currentRow)

df.withColumn("timestamp", to_timestamp("timestamp").cast("long"))\
  .withColumn("distinct_port_flag_overs_3h", size(collect_set(struct("port", "flag")).over(w)))\
  .orderBy(col("timestamp"))\
  .show()
© www.soinside.com 2019 - 2024. All rights reserved.