在Pyspark展平集团

问题描述 投票:2回答:2

我有一个pyspark数据帧。例如,

d= hiveContext.createDataFrame([("A", 1), ("B", 2), ("D", 3), ("D", 3),  ("A", 4), ("D", 3)],["Col1", "Col2"])

+----+----+
|Col1|Col2|
+----+----+
|   A|   1|
|   B|   2|
|   D|   3|
|   D|   3|
|   A|   4|
|   D|   3|
+----+----+

我想通过Col1分组,然后创建一个Col2列表。我需要压扁群体。我确实有很多专栏。

+----+----------+
|Col1|      Col2|
+----+----------+
|   A|   [1,4]  |
|   B|   [2]    |
|   D|   [3,3,3]|
+----+----------+
group-by pyspark spark-dataframe
2个回答
4
投票

你可以做一个groupBy()并使用collect_list()作为你的聚合函数:

import pyspark.sql.functions as f
d.groupBy('Col1').agg(f.collect_list('Col2').alias('Col2')).show()
#+----+---------+
#|Col1|     Col2|
#+----+---------+
#|   B|      [2]|
#|   D|[3, 3, 3]|
#|   A|   [1, 4]|
#+----+---------+

Update

如果要组合多个列,可以在每个列上使用collect_list(),并使用struct()udf()组合生成的列表。请考虑以下示例:

创建虚拟数据

from operator import add
import pyspark.sql.functions as f

# create example dataframe
d = sqlcx.createDataFrame(
    [
        ("A", 1, 10),
        ("B", 2, 20),
        ("D", 3, 30),
        ("D", 3, 10),
        ("A", 4, 20),
        ("D", 3, 30)
    ],
    ["Col1", "Col2", "Col3"]
)

将所需列收集到列表中

假设您有一个要列入列表的列的列表。您可以执行以下操作:

cols_to_combine = ['Col2', 'Col3']
d.groupBy('Col1').agg(*[f.collect_list(c).alias(c) for c in cols_to_combine]).show()
#+----+---------+------------+
#|Col1|     Col2|        Col3|
#+----+---------+------------+
#|   B|      [2]|        [20]|
#|   D|[3, 3, 3]|[30, 10, 30]|
#|   A|   [4, 1]|    [20, 10]|
#+----+---------+------------+

将结果列表合并为一列

现在我们要将列表列合并到一个列表中。如果我们使用struct(),我们将获得以下内容:

d.groupBy('Col1').agg(*[f.collect_list(c).alias(c) for c in cols_to_combine])\
    .select('Col1', f.struct(*cols_to_combine).alias('Combined'))\
    .show(truncate=False)
#+----+------------------------------------------------+
#|Col1|Combined                                        |
#+----+------------------------------------------------+
#|B   |[WrappedArray(2),WrappedArray(20)]              |
#|D   |[WrappedArray(3, 3, 3),WrappedArray(10, 30, 30)]|
#|A   |[WrappedArray(1, 4),WrappedArray(10, 20)]       |
#+----+------------------------------------------------+

展平包裹的阵列

差不多了。我们只需要结合WrappedArrays。我们可以用udf()实现这个目标:

combine_wrapped_arrays = f.udf(lambda val: reduce(add, val), ArrayType(IntegerType()))
d.groupBy('Col1').agg(*[f.collect_list(c).alias(c) for c in cols_to_combine])\
    .select('Col1', combine_wrapped_arrays(f.struct(*cols_to_combine)).alias('Combined'))\
    .show(truncate=False)
#+----+---------------------+
#|Col1|Combined             |
#+----+---------------------+
#|B   |[2, 20]              |
#|D   |[3, 3, 3, 30, 10, 30]|
#|A   |[1, 4, 10, 20]       |
#+----+---------------------+

参考


Update 2

更简单的方法,无需处理WrappedArrays:

from operator import add

combine_udf = lambda cols: f.udf(
    lambda *args: reduce(add, args),
    ArrayType(IntegerType())
)

d.groupBy('Col1').agg(*[f.collect_list(c).alias(c) for c in cols_to_combine])\
    .select('Col1', combine_udf(cols_to_combine)(*cols_to_combine).alias('Combined'))\
    .show(truncate=False)
#+----+---------------------+
#|Col1|Combined             |
#+----+---------------------+
#|B   |[2, 20]              |
#|D   |[3, 3, 3, 30, 10, 30]|
#|A   |[1, 4, 10, 20]       |
#+----+---------------------+

注意:最后一步仅适用于所有列的数据类型相同的情况。您不能使用此函数将包装数组与混合类型组合在一起。


2
投票

从火花2.4你可以使用pyspark.sql.functions.flatten

import pyspark.sql.functions as f
df.groupBy('Col1').agg(f.flatten(f.collect_list('Col2')).alias('Col2')).show()
© www.soinside.com 2019 - 2024. All rights reserved.