使用PySpark根据行值的模式对记录进行分组

问题描述 投票:0回答:1

我有一个包含三列的表:

Table A:

+----+----+----------+                                                          
|col1|col2|row_number|
+----+----+----------+
|   X|   1|         1|
|   Y|   0|         2|
|   Z|   2|         3|
|   A|   1|         4|
|   B|   0|         5|
|   C|   0|         6|
|   D|   2|         7|
|   P|   1|         8|
|   Q|   2|         9|
+----+----+----------+

我想通过基于“ col2”值对记录进行分组来连接“ col1”中的字符串。“ col2”的模式为1,后跟任意数量的0,然后为2。我想对具有“ col2”的记录进行分组,这些记录以1开头,以2结尾(必须保持数据帧的顺序-您可以使用订单的row_number列)

例如,由于“ col2”具有“ 1-0-2”,因此前3个记录可以组合在一起。接下来的4条记录可以分组在一起,因为它们的“ col2”值具有“ 1-0-0-2”]

将这些记录分组后,可以使用“ concat_ws”来完成串联部分。但是,如何基于“ 1-0s-2”模式对这些记录进行分组有帮助吗?

预期输出:

+----------+
|output_col|
+----------+
|       XYZ|   
|      ABCD|   
|        PQ| 
+----------+

您可以使用以下代码创建此样本数据:

schema = StructType([StructField("col1", StringType())\
                   ,StructField("col2", IntegerType())\
                   ,StructField("row_number", IntegerType())])

data = [['X', 1, 1], ['Y', 0, 2], ['Z', 2, 3], ['A', 1, 4], ['B', 0, 5], ['C', 0, 6], ['D', 2, 7], ['P', 1, 8], ['Q', 2, 9]]

df = spark.createDataFrame(data,schema=schema)
df.show()
python apache-spark pyspark apache-spark-sql pyspark-sql
1个回答
0
投票

我建议您使用window功能。首先使用由row_number排序的窗口以获取incremental sum。此第一个增量总和将具有3的倍数,基本上是您需要的组的端点。用同一窗口的滞后时间替换它们,然后在此列上运行另一个增量和得到所需的增量在增量中的分区。现在,您可以使用w2 划分为 "incremental_sum"来使用F.collect_listcollect您的字符串。为此,您可以使用array_join,然后使用distinct获得所需的输出。 (array_join仅用于spark2.4。)>

如果您不想使用distinct()

,则可以创建另一个窗口,并获取w2的row_number(带有orderBy)并过滤第一行。
from pyspark.sql import functions as F
w=Window().orderBy("row_number")
w2=Window().partitionBy("incremental_sum")
df.withColumn("incremental_sum", F.sum("col2").over(w))\
  .withColumn("lag", F.lag("incremental_sum").over(w))\
  .withColumn("incremental_sum", F.when(F.col("incremental_sum")%3==0, F.col("lag")).otherwise(F.col("incremental_sum")))\
  .withColumn("output_col", F.array_join(F.collect_list("col1").over(w2),''))\
  .select(F.col("output_col")).distinct().show()

+----------+
|output_col|
+----------+
|       XYZ|
|      ABCD|
|        PQ|
+----------+
© www.soinside.com 2019 - 2024. All rights reserved.