我有一个这样的数据集:
timestamp vars
2 [1,2]
2 [1,2]
3 [1,2,3]
3 [1,2]
而且我想要一个这样的数据框。基本上,以上数据框中的每个值都是一个索引,并且该值的频率是该索引处的值。此计算是在每个唯一的时间戳上完成的。
timestamp vars
2 [0, 2, 2]
3 [0,2,2,1]
[现在,我按时间戳分组,然后对vars进行聚合/展平(为时间戳2获得类似(1,2,1,2,为时间戳3获得1,2,3,1,2)),然后我有一个使用collections.Counter的udf来获取key-> value dict。然后将这个dict转换成我想要的格式。
groupBy / agg可以任意增大(数组大小可以是数百万个,这对于Window函数来说似乎是一个很好的用例,但是我不确定如何将它们放在一起。
[还值得一提的是,我尝试了重新分区,并转换为RDD并使用groupByKey。在大型数据集上,两者的速度都非常慢(> 24小时)。
IIUC,您可以尝试以下代码(需要spark 2.4 +):
from pyspark.sql.functions import flatten, collect_list
df = spark.createDataFrame([(2,[1,2]), (2,[1,2]), (3,[1,2,3]), (3,[1,2])],['timestamp', 'vars'])
df.groupby('timestamp').agg(flatten(collect_list('vars')).alias('data')) \
.selectExpr(
"timestamp",
"transform(sequence(0, array_max(data)), x -> size(filter(data, y -> y = x))) as vars"
).show(truncate=False)
+---------+------------+
|timestamp|vars |
+---------+------------+
|3 |[0, 2, 2, 1]|
|2 |[0, 2, 2] |
+---------+------------+
其中:
(1)我们使用groupby
获取每个timestamp
的扁平化列表,并将其命名为ArrayType列为data
((2)对于每个列表,使用array_max(data)查找列表的最大值,然后使用sequence函数生成从0到此最大值的序列
[(3)通过上述序列(作为x)进行迭代,并使用data
和x
函数将其转换为size数组中具有与filter相同值的项目数。
顺便说一句,您也可以尝试使用aggregate功能:
df.groupby('timestamp').agg(flatten(collect_list('vars')).alias('data')) \
.selectExpr("timestamp", """
aggregate(
data,
/* use an array as zero_value, size = array_max(data))+1 and all values are zero */
array_repeat(0, int(array_max(data))+1),
/* increment the ith value of the Array by 1 if i == y */
(acc, y) -> transform(acc, (x,i) -> IF(i=y, x+1, x))
) as vars
""").show(truncate=False)
IIUC,您可以尝试以下代码(需要spark 2.4 +):
from pyspark.sql.functions import flatten, collect_list
df = spark.createDataFrame([(2,[1,2]), (2,[1,2]), (3,[1,2,3]), (3,[1,2])],['timestamp', 'vars'])
df.groupby('timestamp').agg(flatten(collect_list('vars')).alias('data')) \
.selectExpr(
"timestamp",
"transform(sequence(0, array_max(data)), x -> size(filter(data, y -> y = x))) as vars"
).show(truncate=False)
+---------+------------+
|timestamp|vars |
+---------+------------+
|3 |[0, 2, 2, 1]|
|2 |[0, 2, 2] |
+---------+------------+
其中:
(1)我们使用groupby
获取每个timestamp
的扁平化列表,并将其命名为ArrayType列为data
((2)对于每个列表,使用array_max(data)查找列表的最大值,然后使用sequence函数生成从0到此最大值的序列
[(3)通过上述序列(作为x)进行迭代,并使用data
和x
函数将其转换为size数组中具有与filter相同值的项目数。