改进现有聚合pyspark代码的思考

问题描述 投票:0回答:1

这是我的 Spark 数据框,

+---+----------------------------------+----------+----------+
|id |timestamp                         |Fname     |Lname     |
+---+----------------------------------+----------+----------+
|1  |2024-01-19T11:52:44.775205Z       |Robert    |Albert    |
|1  |2024-01-20T11:52:44.775205Z       |Remo      |Lergos    |
|2  |2024-01-21T11:52:44.775205Z       |Charlie   |Jameson   |
|2  |2024-01-22T11:52:44.775205Z       |Anastacio |Sporer    |
|2  |2024-01-23T11:52:44.775205Z       |Luz       |Toy       |
|3  |2024-01-24T11:52:44.775205Z       |Crystal   |Hills     |
|3  |2024-01-25T11:52:44.775205Z       |Nicholas  |Johnson   |
+---+----------------------------------+----------+----------+

以下是所涉及的步骤,

  1. 按“id”值分组
  2. 获取名字和最新名字作为字典列表。
  3. 根据“id”的最新时间戳获取 fname 和 Lname 并将它们存储为单独的列作为字典。
  4. 步骤3中使用的最新时间戳也应存储为单独的列。

根据上述步骤,我尝试获取如下结果数据框,

+----+--------------------------------------+----------------------------+-----------------------------------------------------------------------------------------------------+
|id  |latest_names                          |latest_timestamp            |all_names                                                                                            |
+----+--------------------------------------+----------------------------+-----------------------------------------------------------------------------------------------------+
|1   |{"Fname":"Remo","Lname":"Lergos"}     |2024-01-20T11:52:44.775205Z |[{"Fname":"Remo","Lname":"Lergos"},{"Fname":"Remo","Lname":"Lergos"}]                                |
|2   |{"Fname":"Luz","Lname":"Toy"}         |2024-01-23T11:52:44.775205Z |[{"Fname":"Luz","Lname":"Toy"},{"Fname":"Remo","Lname":"Lergos"},{"Fname":"Remo","Lname":"Lergos"}]  |     
|3   |{"Fname":"Nicholas","Lname":"Johnson"}|2024-01-25T11:52:44.775205Z |[{"Fname":"Nicholas","Lname":"Johnson"},{"Fname":"Remo","Lname":"Lergos"}]                           |
+----+--------------------------------------+----------------------------+-----------------------------------------------------------------------------------------------------+

我尝试了以下带有 join 和 windowspec 的 pyspark 代码来获取带有时间戳 desc 的第一个元素,

windowspec = Window.partitionBy("id").orderBy(df["timestamp"].desc())
columns_names = ["Fname","Lname"]
df.withColumn(
    "all_names",
    F.to_json(
        F.struct(
            "Fname","Lname"
        )
    ),
)
.withColumn(
    "latest_names",
    F.to_json(
        F.struct(*[F.first(field).over(windowspec).alias(field) for field in columns_names])
    ),
)
.withColumn("latest_timestamp", F.first("timestamp").over(windowspec).alias("timestamp"))
.groupBy("id")
.agg(
    F.collect_set("all_names").alias("all_names"),
    F.first("latest_names").alias("latest_names"),
    F.first("l_timestamp").alias("latest_timestamp"),
)

我能够达到结果,但想知道是否有更好的方法,因为我有多个列来执行 (Fname,Lname) 的类似操作 我有其他列(地址1,地址2,地址3),我想在其中执行相同的列操作来获取最新地址,我使用单个windowspec来执行该操作,但是有更好的方法吗???

python performance apache-spark pyspark aggregate
1个回答
0
投票

尝试一下,让我知道你的想法:

import pyspark.sql.functions as f

df = (
    df
    .groupBy('id')
    .agg(
        f.collect_list(f.struct('timestamp', 'Fname', 'Lname')).alias('all_names'),
        f.max('timestamp').alias('latest_timestamp')
    )
    .withColumn('latest_name', f.expr("filter(all_names, x -> x.timestamp = latest_timestamp)")[0])
)
© www.soinside.com 2019 - 2024. All rights reserved.