我有一个包含三列的表:
Table A:
+----+----+----------+
|col1|col2|row_number|
+----+----+----------+
| X| 1| 1|
| Y| 0| 2|
| Z| 2| 3|
| A| 1| 4|
| B| 0| 5|
| C| 0| 6|
| D| 2| 7|
| P| 1| 8|
| Q| 2| 9|
+----+----+----------+
我想通过基于“ col2”值对记录进行分组来连接“ col1”中的字符串。“ col2”的模式为1,后跟任意数量的0,然后为2。我想对具有“ col2”的记录进行分组,这些记录以1开头,以2结尾(必须保持数据帧的顺序-您可以使用订单的row_number列)
例如,由于“ col2”具有“ 1-0-2”,因此前3个记录可以组合在一起。接下来的4条记录可以分组在一起,因为它们的“ col2”值具有“ 1-0-0-2”]
将这些记录分组后,可以使用“ concat_ws”来完成串联部分。但是,如何基于“ 1-0s-2”模式对这些记录进行分组有帮助吗?
预期输出:
+----------+
|output_col|
+----------+
| XYZ|
| ABCD|
| PQ|
+----------+
您可以使用以下代码创建此样本数据:
schema = StructType([StructField("col1", StringType())\
,StructField("col2", IntegerType())\
,StructField("row_number", IntegerType())])
data = [['X', 1, 1], ['Y', 0, 2], ['Z', 2, 3], ['A', 1, 4], ['B', 0, 5], ['C', 0, 6], ['D', 2, 7], ['P', 1, 8], ['Q', 2, 9]]
df = spark.createDataFrame(data,schema=schema)
df.show()
我建议您使用window
功能。首先使用由row_number
排序的窗口以获取incremental sum。此第一个增量总和将具有3的倍数,基本上是您需要的组的端点。用同一窗口的滞后时间替换它们,然后在此列上运行另一个增量和得到所需的增量在增量中的分区。现在,您可以使用w2
划分为 "incremental_sum"
来使用F.collect_list
至collect
您的字符串。为此,您可以使用array_join,然后使用distinct获得所需的输出。 (array_join仅用于spark2.4。)>
如果您不想使用distinct()
,则可以创建另一个窗口,并获取w2的row_number(带有orderBy)并过滤第一行。from pyspark.sql import functions as F
w=Window().orderBy("row_number")
w2=Window().partitionBy("incremental_sum")
df.withColumn("incremental_sum", F.sum("col2").over(w))\
.withColumn("lag", F.lag("incremental_sum").over(w))\
.withColumn("incremental_sum", F.when(F.col("incremental_sum")%3==0, F.col("lag")).otherwise(F.col("incremental_sum")))\
.withColumn("output_col", F.array_join(F.collect_list("col1").over(w2),''))\
.select(F.col("output_col")).distinct().show()
+----------+
|output_col|
+----------+
| XYZ|
| ABCD|
| PQ|
+----------+