PySpark DataFrame:标记某些列值发生变化的行

问题描述 投票:0回答:1

我有一个PySpark DataFrame,列有'people'和'timestamp'(加上与问题无关的其他列)。解释是用户当时做了一些事情。

我想将“时间戳”的所有行分组,其中“时间戳”的差异不超过“阈值”值(例如5分钟)。

我是如何在PySpark中实现这一目标的?最好将DataFrame作为结果吗?

欣赏你的想法!

python pyspark apache-spark-sql pyspark-sql
1个回答
0
投票

我们假设您有['people','timestamp','activity']

SData = Row("people","session_start", "session_end")

def getSessions(dt):
    info = dt[1]
    data = []
    session_start = info[0][0]
    session_end = info[0][0]
    for x in info[1:]:
        if ((x[1] - session_end) > 5*60*1000):
            data.append(SData(dt[0], session_start, session_end)
            session_start = x[1]
        session_end = x[1]
    data.append(SData(dt[0],session_start, session_end))
    return data


rdd  = df.rdd.map(lambda x: (x[0],(x[1],x[2])))

df = rdd.groupByKey().mapValues(lambda x: sorted(x, key=lambda z:z)).flatMap(getSessions).toDF()

基本上将它映射到rdd返回到df。

没有rdd的另一种方法是创建一个udf返回的会话数组。最后,我们可以使用explode来明智地获取数据。

© www.soinside.com 2019 - 2024. All rights reserved.