Spark中的pyspark日期范围计算

问题描述 投票:0回答:1

我正在尝试处理每个用户的网站登录会话数据。我正在将S3会话日志文件读入RDD。数据看起来像这样。

----------------------------------------
User | Site   | Session start   | Session end
---------------------------------------
Joe  |Waterloo| 9/21/19 3:04 AM |9/21/19 3:18 AM

Stacy|Kirkwood| 8/4/19 3:06 PM  |8/4/19 3:54 PM

John |Waterloo| 9/21/19 8:48 AM |9/21/19 9:05 AM

Stacy|Kirkwood| 8/4/19 4:16 PM  |8/4/19 5:41 PM
...
...

我想找出在一天中的每一秒内,有多少用户登录。

示例:我可能只为9/21/19处理此数据。因此,我需要删除所有其他记录,然后删除19年9月21日所有24小时中每小时的SUM用户会话。在9/21/19的所有时间内,输出应该为24行,然后在一天中的每一秒进行计数(按秒,每秒数据!)。

使用rdds或DF在pyspark中可以执行此操作吗?(对于构建网格的迟到表示歉意)。谢谢

pyspark rdd pyspark-sql pyspark-dataframes
1个回答
0
投票

尝试检查此:

初始化过滤器。

val filter = to_date("2019-09-21")
val startFilter = to_timestamp("2019-09-21 00:00:00.000")
val endFilter = to_timestamp("2019-09-21 23:59:59.999")

生成范围(0 .. 23)。

hours = spark.range(24).collect()

获取与过滤器匹配的实际用户会话。

df = sessions.alias("s")\
    .where(filter >= to_date(s.start) & filter <= to_date(s.end))\
    .select(s.user, s.site, \
            when(s.start < startFilter, startFilter).otherwise(s.start).alias("start"), \
            when(s.end > endFilter, endFilter).otherwise(s.end).alias("end"))

结合小时范围内的匹配用户会话。

df2 = df.join(range, hours.id.between(hour(df.start), hour(df.end)), 'inner')\
    .select(df.user, hours.id.alias("hour"), \
        (when(hour(df.end) > hours.id, 360).otherwise(minute(df.end) * 60 + second(df.end)) - \
         when(hour(df.start) < hours.id, 0).otherwise(minute(df.start) * 60 + second(df.start))).alias("seconds"))

生成摘要:计算每小时会话的用户计数和秒数总和。

df2.groupBy(df2.hour)\
    .agg(count(df2.user).alias("user counts"), \
         sum(dg2.seconds).alias("seconds")) \
    .show()

希望这会有所帮助。

© www.soinside.com 2019 - 2024. All rights reserved.