我正在尝试处理每个用户的网站登录会话数据。我正在将S3会话日志文件读入RDD。数据看起来像这样。
----------------------------------------
User | Site | Session start | Session end
---------------------------------------
Joe |Waterloo| 9/21/19 3:04 AM |9/21/19 3:18 AM
Stacy|Kirkwood| 8/4/19 3:06 PM |8/4/19 3:54 PM
John |Waterloo| 9/21/19 8:48 AM |9/21/19 9:05 AM
Stacy|Kirkwood| 8/4/19 4:16 PM |8/4/19 5:41 PM
...
...
我想找出在一天中的每一秒内,有多少用户登录。
示例:我可能只为9/21/19
处理此数据。因此,我需要删除所有其他记录,然后删除19年9月21日所有24小时中每小时的SUM用户会话。在9/21/19的所有时间内,输出应该为24行,然后在一天中的每一秒进行计数(按秒,每秒数据!)。
使用rdds或DF在pyspark中可以执行此操作吗?(对于构建网格的迟到表示歉意)。谢谢
尝试检查此:
初始化过滤器。
val filter = to_date("2019-09-21")
val startFilter = to_timestamp("2019-09-21 00:00:00.000")
val endFilter = to_timestamp("2019-09-21 23:59:59.999")
生成范围(0 .. 23)。
hours = spark.range(24).collect()
获取与过滤器匹配的实际用户会话。
df = sessions.alias("s")\
.where(filter >= to_date(s.start) & filter <= to_date(s.end))\
.select(s.user, s.site, \
when(s.start < startFilter, startFilter).otherwise(s.start).alias("start"), \
when(s.end > endFilter, endFilter).otherwise(s.end).alias("end"))
结合小时范围内的匹配用户会话。
df2 = df.join(range, hours.id.between(hour(df.start), hour(df.end)), 'inner')\
.select(df.user, hours.id.alias("hour"), \
(when(hour(df.end) > hours.id, 360).otherwise(minute(df.end) * 60 + second(df.end)) - \
when(hour(df.start) < hours.id, 0).otherwise(minute(df.start) * 60 + second(df.start))).alias("seconds"))
生成摘要:计算每小时会话的用户计数和秒数总和。
df2.groupBy(df2.hour)\
.agg(count(df2.user).alias("user counts"), \
sum(dg2.seconds).alias("seconds")) \
.show()
希望这会有所帮助。