如何在pyspark的流查询中生成会话窗口?

问题描述 投票:2回答:1

以下代码可按预期工作,但对于以臭名昭著的AnalysisException进行流式查询失败,但是>

流式数据帧/数据集不支持基于非时间的窗口

from pyspark.sql.functions import *
from pyspark.sql.window import Window

temp = [
  ('Alice', 1),
  ('Alice', 60),
  ('Alice', 160),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1112),
  ('Bob', 3),
  ('Alice', 2),
  ('Bob', 2),
  ('Alice', 3),
  ('Bob', 1)
]

temp_df = spark.createDataFrame(temp, ["user", "ev_timestamp"])

maxSessionDuration = 60 * 10 # Max session duration of 10 minutes.
client_fp_time_window = Window.partitionBy("user").orderBy("ev_timestamp")

rowsWithSessionIds = temp_df \
    .select("user", "ev_timestamp", lag("ev_timestamp", 1).over(client_fp_time_window).alias("prevEvTimestamp")) \
    .select("user", "ev_timestamp", 
            when(
              (col("ev_timestamp").cast('long') - col("prevEvTimestamp").cast('long')) < maxSessionDuration, 0) \
            .otherwise(1).alias("isNewSession")
    ) \
    .select("user", "ev_timestamp", sum("isNewSession").over(client_fp_time_window).alias("sessionId"))

display(rowsWithSessionIds)


sessionsDF = rowsWithSessionIds \
  .groupBy("user", "sessionId") \
  .agg(min("ev_timestamp").alias("startTime"), max("ev_timestamp").alias("endTime"), count("*").alias("count")) \
  .alias('Session')

display(sessionsDF)

我知道这是因为流查询不支持lag()函数。因此,推荐的替代方法是使用mapGroupsWithState()方法,但这仅限于Scala / Java。

如何在Pyspark中实现?或者使用Pyspark进行结构化会话还有哪些其他选择?

每批所需的输出看起来像这样:

user    sessionId   startTime   endTime count
Bob     1           1           3       3
Alice   1           1           160     5
Alice   2           1111        1112    7

以下代码可以按预期方式进行批处理,但是对于带有臭名昭著的AnalysisException的流查询失败:从...

apache-spark pyspark databricks spark-structured-streaming azure-databricks
1个回答
© www.soinside.com 2019 - 2024. All rights reserved.