获取具有最新时间戳的数据帧行

问题描述 投票:0回答:1

我有这个数据框:

+------+-------------------+-----------+------------------------+------------------------+
|brand |original_timestamp |weight     |arrival_timestamp       |features                |
+------+-------------------+-----------+------------------------+------------------------+
|BR1   |1632899456         |4.0        |2023-08-09 17:12:24.002 |{f1 -> 12, f2 -> 52}    |
|BR1   |1632899456         |4.0        |2023-08-09 17:14:24.002 |{f1 -> 42, f2 -> 12}    |
|BR1   |1632899456         |2.0        |2023-08-09 17:46:24.002 |{f1 -> 42, f2 -> 12}    |
|BR3   |1632899155         |2.0        |2023-08-09 17:14:24.002 |{f1 -> 72, f2 -> 50}    |
|BR3   |1632899155         |9.0        |2023-08-09 17:20:24.002 |{f1 -> 72, f2 -> 50}    |

我想获取以下每种组合的最新“arrival_timestamp”: 品牌、原始时间戳和功能。 这是我正在使用的代码:

import org.apache.spark.sql.functions._
val windowSpec = Window.partitionBy("brand","original_timestamp","features").orderBy(col("arrival_timestamp").desc)

df.withColumn("maxTS", first("arrival_timestamp").over(windowSpec))
  .select("*").where(col("maxTS") === col("arrival_timestamp"))
  .drop("maxTS")
  .show(false)

这是我期待的输出:

+------+-------------------+-----------+------------------------+------------------------+
|brand |original_timestamp |weight     |arrival_timestamp       |features                |
+------+-------------------+-----------+------------------------+------------------------+
|BR1   |1632899456         |4.0        |2023-08-09 17:12:24.002 |{f1 -> 12, f2 -> 52}    |

|BR1   |1632899456         |2.0        |2023-08-09 17:46:24.002 |{f1 -> 42, f2 -> 12}    |

|BR3   |1632899155         |9.0        |2023-08-09 17:20:24.002 |{f1 -> 72, f2 -> 50}    |

但是,我收到这样的错误:

分组/连接/窗口分区键不能是map类型。

java.lang.IllegalStateException:分组/加入/窗口分区键不能是映射类型。

那是因为列特征是一个地图。

还有其他方法可以进行此分组吗?

我正在使用 Spark 版本 3.2.2 和 SQLContext,以及 scala 语言。

dataframe scala apache-spark apache-spark-sql spark-structured-streaming
1个回答
0
投票

使用

map
函数将
string
类型转换为某种
to_json
类型,并将其传递到
Window.partitionBy
函数中。检查下面的示例解决方案。

scala> mdf.show(false)
+-----+------------------+------+-----------------------+--------------------+
|brand|original_timestamp|weight|arrival_timestamp      |features            |
+-----+------------------+------+-----------------------+--------------------+
|BR1  |1632899456        |4.0   |2023-08-09 17:12:24.002|{f1 -> 12, f2 -> 52}|
|BR1  |1632899456        |4.0   |2023-08-09 17:14:24.002|{f1 -> 42, f2 -> 12}|
|BR1  |1632899456        |2.0   |2023-08-09 17:46:24.002|{f1 -> 42, f2 -> 12}|
|BR3  |1632899155        |2.0   |2023-08-09 17:14:24.002|{f1 -> 72, f2 -> 50}|
|BR3  |1632899155        |9.0   |2023-08-09 17:20:24.002|{f1 -> 72, f2 -> 50}|
+-----+------------------+------+-----------------------+--------------------+
scala> import org.apache.spark.sql.expressions._
import org.apache.spark.sql.expressions._

scala> val windowSpec = Window.partitionBy($"brand",$"original_timestamp",to_json($"features").as("features")).orderBy(col("arrival_timestamp").desc)
windowSpec: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@10b34165

scala> mdf.withColumn("maxTs", first($"arrival_timestamp").over(windowSpec)).where($"maxTs" === $"arrival_timestamp").drop("maxTs").show(false)
+-----+------------------+------+-----------------------+--------------------+
|brand|original_timestamp|weight|arrival_timestamp      |features            |
+-----+------------------+------+-----------------------+--------------------+
|BR1  |1632899456        |4.0   |2023-08-09 17:12:24.002|{f1 -> 12, f2 -> 52}|
|BR1  |1632899456        |2.0   |2023-08-09 17:46:24.002|{f1 -> 42, f2 -> 12}|
|BR3  |1632899155        |9.0   |2023-08-09 17:20:24.002|{f1 -> 72, f2 -> 50}|
+-----+------------------+------+-----------------------+--------------------+
© www.soinside.com 2019 - 2024. All rights reserved.