火花中的共享状态?

问题描述 投票:0回答:1

我正在阅读防火墙日志数据,其中包括以下内容:

(UniqueID,start_or_stop,timestamp)在某些时候,每个“start”后面都是stop,当它出现时我想输出(UniqueID,start_time,stop_time)

这意味着通常跟踪状态,当行阅读器看到开始然后在字典中记录时,当它看到停止时它然后从字典中删除开始并发出输出。

我的问题是:如何使用apache spark跟踪这种共享状态?

值得指出的是,一旦达到停止,就可以重复使用UniqueID - 它来自sourceIP-sourcePort-destIP-destPort,可以重复使用。

apache-spark pyspark
1个回答
1
投票

假设约束指示stop的行将始终跟随指示给定startUniqueID的行,请考虑以下输入(0表示开始,1表示停止事件):

UniqueID,start_or_stop,timestamp
u1,0,2018-01-22 13:04:32
u2,0,2018-01-22 13:04:35
u2,1,2018-01-25 18:55:08
u3,0,2018-01-25 18:56:17
u1,1,2018-01-25 20:51:43
u2,0,2018-01-31 07:48:43
u3,1,2018-01-31 07:48:48
u1,0,2018-02-02 09:40:58
u2,1,2018-02-02 09:41:01
u1,1,2018-02-05 14:03:27

然后,我们可以应用以下转换来获得您想要的内容。代码在scala中,但python中提供了相同的函数(因此,我认为可以很容易地推断和移植):

//Define the window specification, after partition and sort, select 
//the 2 rows in the window group that will contain the start/stop time
val wSpec = Window.partitionBy('UniqueID).orderBy('timestamp).rowsBetween(0, 1)

//Assume df is the DataFrame loaded with above data
df.withColumn("Last", last('timestamp) over wSpec). //add a new col having stop time
    where("start_or_stop = 0").  //Just need the alternate rows
    drop("start_or_stop"). //Drop column
    withColumnRenamed("timestamp", "start_time"). //Rename to start
    withColumnRenamed("Last", "stop_time").  //Rename to stop
    show(false)

这提供了以下输出:

+--------+---------------------+---------------------+
|UniqueID|start_time           |stop_time            |
+--------+---------------------+---------------------+
|u3      |2018-01-25 18:56:17.0|2018-01-31 07:48:48.0|
|u1      |2018-01-22 13:04:32.0|2018-01-25 20:51:43.0|
|u1      |2018-02-02 09:40:58.0|2018-02-05 14:03:27.0|
|u2      |2018-01-22 13:04:35.0|2018-01-25 18:55:08.0|
|u2      |2018-01-31 07:48:43.0|2018-02-02 09:41:01.0|
+--------+---------------------+---------------------+
© www.soinside.com 2019 - 2024. All rights reserved.