我有如下所示的JSON原始数据
{"event" : "login","time" : "2019-11-20 00:14:46","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:46","user_id" : 992210}
{"event" : "login","time" : "2019-11-20 00:14:46","user_id" : 823323}
{"event" : "logout","time" : "2019-11-20 00:14:47","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:47","user_id" : 823323}
{"event" : "login","time" : "2019-11-20 00:14:50","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:57","user_id" : 978699}
我为上述原始数据创建了数据框
val rawDataFrame = sparkSession.read.option("multiline", "true").json(cleanJsonLines)
我需要找出每个用户登录我们的系统需要多少秒。预期的最终结果如下所示:
{"user_id": 978699, "logged_in_sec":8} // (2019-11-20 00:14:47 - 2019-11-20 00:14:46) + (2019-11-20 00:14:57 - 2019-11-20 00:14:50) {"user_id": 992210, "logged_in_sec":0} {"user_id": 823323, "logged_in_sec":1}
我是Spark和Scala的新手,无法解决此问题的窗口功能。
我不想编写程序化的代码方式来迭代数据帧的每一行,并根据每个user_id计算先前的“登录”事件与当前的“注销”事件之间的差。
请为我提供解决此问题的方法。感谢您的阅读。
我有类似以下的JSON原始数据{“ event”:“ login”,“ time”:“ 2019-11-20 00:14:46”,“ user_id”:978699} {“ event”:“ logout”, “ time”:“ 2019-11-20 00:14:46”,“ user_id”:992210} {“ event”:“ login”,“ time”:...
根据您的数据,
你在这里,