PySpark在一个分区中的第一个和最后一个函数

问题描述 投票:1回答:1

我有这样的pyspark代码,

spark_df = spark_df.orderBy('id', 'a1', 'c1')
out_df = spark_df.groupBy('id', 'a1', 'a2').agg(
    F.first('c1').alias('c1'),
    F.last('c2').alias('c2'),
    F.first('c3').alias('c3'))

我需要保持数据按订单ID,a1和c1排序。然后,在键id,a1和c1定义的组上选择如上所示的列。

由于第一个和最后一个不确定性,我将代码更改为看起来丑陋的代码,该代码可以工作,但我不确定这是有效的。

w_first = Window.partitionBy('id', 'a1', 'a2').orderBy('c1')
w_last = Window.partitionBy('id', 'a1', 'a2').orderBy(F.desc('c1'))

out_first = spark_df.withColumn('Rank_First', F.rank().over(w_first)).filter(F.col('Rank_First') == 1).drop(
    'Rank_First')
out_last = spark_df.withColumn('Rank_Last', F.rank().over(w_last)).filter(F.col('Rank_First') == 1).drop(
    'Rank_Last')

out_first = out_first.withColumnRenamed('c1', 'First_c1') \
    .withColumnRenamed('c2', 'First_c2') \
    .withColumnRenamed('c3', 'First_c3')

out_last = out_last.withColumnRenamed('c1', 'Last_c1') \
    .withColumnRenamed('c2', 'Last_c2') \
    .withColumnRenamed('c3', 'Last_c3')

out_df = out_first.join(out_last, ['id', 'a1', 'a2']) \
    .select('id', 'a1', 'a2', F.col('First_c1').alias('c1'),
            F.col('Last_c2').alias('c2'),
            F.col('First_c3').alias('c3'))

我正在尝试一种更好,更有效的替代方案。当数据量巨大时,我会遇到性能瓶颈。

是否有更好的替代方案,可以一次按特定顺序对一个窗口进行先行和后行。

pyspark pyspark-sql pyspark-dataframes
1个回答
0
投票

您仍然可以在保证确定性的窗口上使用lastfirst函数。指定窗口时,需要在rowsBetween上添加边界,以便last给出正确的值(按照此post)。

尝试一下:

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.