我有一个表
data
,其中包含三列:id
、time
和 text
。具有相同 id
的行包含按 time
排序的相同长文本。目标是按 id
分组,按 time
排序,然后聚合它们(连接所有 text
)。我正在使用 PySpark。
我可以使用窗口函数获取组内元素的顺序:
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
w = W.partitionBy(F.col('id')).orderBy(F.col('time').asc())
(
data
.withColumn('text_num', F.row_number().over(w))
)
我也可以按组连接文本,但这并不能保证顺序:
(
data
.groupby('id')
.agg(F.concat_ws(' ', F.collect_list(F.col('text'))).alias('concat_text'))
)
如何分组、在组内排序,然后聚合排序后的数据?
假设我们有一个数据框:
data = spark.createDataFrame(
[('a', 1, 'str1'), ('a', 2, 'str2')],
schema=['id', 'time', 'text']
)
data.printSchema()
data.show(1, False)
+---+----+----+
|id |time|text|
+---+----+----+
|a |1 |str1|
+---+----+----+
您可以使用
collect_list
依次收集每组的元素并选择最大组:
data.withColumn(
'text_lst', func.collect_list('text').over(Window.partitionBy('id').orderBy(func.asc('time')))
).groupBy(
'id'
).agg(
func.concat_ws('-', func.max('text_lst')).alias('text_concat')
).show(
10, False
)
+---+-----------+
|id |text_concat|
+---+-----------+
|a |str1-str2 |
+---+-----------+