以下代码可按预期工作,但对于以臭名昭著的AnalysisException
进行流式查询失败,但是>
流式数据帧/数据集不支持基于非时间的窗口
from pyspark.sql.functions import * from pyspark.sql.window import Window temp = [ ('Alice', 1), ('Alice', 60), ('Alice', 160), ('Alice', 1111), ('Alice', 1111), ('Alice', 1111), ('Alice', 1111), ('Alice', 1111), ('Alice', 1111), ('Alice', 1112), ('Bob', 3), ('Alice', 2), ('Bob', 2), ('Alice', 3), ('Bob', 1) ] temp_df = spark.createDataFrame(temp, ["user", "ev_timestamp"]) maxSessionDuration = 60 * 10 # Max session duration of 10 minutes. client_fp_time_window = Window.partitionBy("user").orderBy("ev_timestamp") rowsWithSessionIds = temp_df \ .select("user", "ev_timestamp", lag("ev_timestamp", 1).over(client_fp_time_window).alias("prevEvTimestamp")) \ .select("user", "ev_timestamp", when( (col("ev_timestamp").cast('long') - col("prevEvTimestamp").cast('long')) < maxSessionDuration, 0) \ .otherwise(1).alias("isNewSession") ) \ .select("user", "ev_timestamp", sum("isNewSession").over(client_fp_time_window).alias("sessionId")) display(rowsWithSessionIds) sessionsDF = rowsWithSessionIds \ .groupBy("user", "sessionId") \ .agg(min("ev_timestamp").alias("startTime"), max("ev_timestamp").alias("endTime"), count("*").alias("count")) \ .alias('Session') display(sessionsDF)
我知道这是因为流查询不支持lag()
函数。因此,推荐的替代方法是使用mapGroupsWithState()
方法,但这仅限于Scala / Java。
如何在Pyspark中实现?或者使用Pyspark进行结构化会话还有哪些其他选择?
每批所需的输出看起来像这样:
user sessionId startTime endTime count
Bob 1 1 3 3
Alice 1 1 160 5
Alice 2 1111 1112 7
以下代码可以按预期方式进行批处理,但是对于带有臭名昭著的AnalysisException的流查询失败:从...