替代Pyspark数据框的GroupBy吗?

问题描述 投票:0回答:1

我有一个这样的数据集:

timestamp     vars 
2             [1,2]
2             [1,2]
3             [1,2,3]
3             [1,2]

而且我想要一个这样的数据框。基本上,以上数据框中的每个值都是一个索引,并且该值的频率是该索引处的值。此计算是在每个唯一的时间戳上完成的。

timestamp     vars 
2             [0, 2, 2]
3             [0,2,2,1]

[现在,我按时间戳分组,然后对vars进行聚合/展平(为时间戳2获得类似(1,2,1,2,为时间戳3获得1,2,3,1,2)),然后我有一个使用collections.Counter的udf来获取key-> value dict。然后将这个dict转换成我想要的格式。

groupBy / agg可以任意增大(数组大小可以是数百万个,这对于Window函数来说似乎是一个很好的用例,但是我不确定如何将它们放在一起。

[还值得一提的是,我尝试了重新分区,并转换为RDD并使用groupByKey。在大型数据集上,两者的速度都非常慢(> 24小时)。

pyspark group-by pyspark-sql
1个回答
1
投票

IIUC,您可以尝试以下代码(需要spark 2.4 +):

from pyspark.sql.functions import flatten, collect_list

df = spark.createDataFrame([(2,[1,2]), (2,[1,2]), (3,[1,2,3]), (3,[1,2])],['timestamp', 'vars'])

df.groupby('timestamp').agg(flatten(collect_list('vars')).alias('data')) \
  .selectExpr(
    "timestamp", 
    "transform(sequence(0, array_max(data)), x -> size(filter(data, y -> y = x))) as vars"
  ).show(truncate=False)
+---------+------------+
|timestamp|vars        |
+---------+------------+
|3        |[0, 2, 2, 1]|
|2        |[0, 2, 2]   |
+---------+------------+

其中:

(1)我们使用groupby获取每个timestamp的扁平化列表,并将其命名为ArrayType列为data

((2)对于每个列表,使用array_max(data)查找列表的最大值,然后使用sequence函数生成从0到此最大值的序列

[(3)通过上述序列(作为x)进行迭代,并使用datax函数将其转换为size数组中具有与filter相同值的项目数。

顺便说一句,您也可以尝试使用aggregate功能:

df.groupby('timestamp').agg(flatten(collect_list('vars')).alias('data')) \
   .selectExpr("timestamp", """ 

     aggregate(   
       data,         
       /* use an array as zero_value, size = array_max(data))+1 and all values are zero */
       array_repeat(0, int(array_max(data))+1),       
       /* increment the ith value of the Array by 1 if i == y */
       (acc, y) -> transform(acc, (x,i) -> IF(i=y, x+1, x))       
     ) as vars   

""").show(truncate=False)

0
投票

IIUC,您可以尝试以下代码(需要spark 2.4 +):

from pyspark.sql.functions import flatten, collect_list

df = spark.createDataFrame([(2,[1,2]), (2,[1,2]), (3,[1,2,3]), (3,[1,2])],['timestamp', 'vars'])

df.groupby('timestamp').agg(flatten(collect_list('vars')).alias('data')) \
  .selectExpr(
    "timestamp", 
    "transform(sequence(0, array_max(data)), x -> size(filter(data, y -> y = x))) as vars"
  ).show(truncate=False)
+---------+------------+
|timestamp|vars        |
+---------+------------+
|3        |[0, 2, 2, 1]|
|2        |[0, 2, 2]   |
+---------+------------+

其中:

(1)我们使用groupby获取每个timestamp的扁平化列表,并将其命名为ArrayType列为data

((2)对于每个列表,使用array_max(data)查找列表的最大值,然后使用sequence函数生成从0到此最大值的序列

[(3)通过上述序列(作为x)进行迭代,并使用datax函数将其转换为size数组中具有与filter相同值的项目数。

© www.soinside.com 2019 - 2024. All rights reserved.