我有这个数据框:
+------+-------------------+-----------+------------------------+------------------------+
|brand |original_timestamp |weight |arrival_timestamp |features |
+------+-------------------+-----------+------------------------+------------------------+
|BR1 |1632899456 |4.0 |2023-08-09 17:12:24.002 |{f1 -> 12, f2 -> 52} |
|BR1 |1632899456 |4.0 |2023-08-09 17:14:24.002 |{f1 -> 42, f2 -> 12} |
|BR1 |1632899456 |2.0 |2023-08-09 17:46:24.002 |{f1 -> 42, f2 -> 12} |
|BR3 |1632899155 |2.0 |2023-08-09 17:14:24.002 |{f1 -> 72, f2 -> 50} |
|BR3 |1632899155 |9.0 |2023-08-09 17:20:24.002 |{f1 -> 72, f2 -> 50} |
我想获取以下每种组合的最新“arrival_timestamp”: 品牌、原始时间戳和功能。 这是我正在使用的代码:
import org.apache.spark.sql.functions._
val windowSpec = Window.partitionBy("brand","original_timestamp","features").orderBy(col("arrival_timestamp").desc)
df.withColumn("maxTS", first("arrival_timestamp").over(windowSpec))
.select("*").where(col("maxTS") === col("arrival_timestamp"))
.drop("maxTS")
.show(false)
这是我期待的输出:
+------+-------------------+-----------+------------------------+------------------------+
|brand |original_timestamp |weight |arrival_timestamp |features |
+------+-------------------+-----------+------------------------+------------------------+
|BR1 |1632899456 |4.0 |2023-08-09 17:12:24.002 |{f1 -> 12, f2 -> 52} |
|BR1 |1632899456 |2.0 |2023-08-09 17:46:24.002 |{f1 -> 42, f2 -> 12} |
|BR3 |1632899155 |9.0 |2023-08-09 17:20:24.002 |{f1 -> 72, f2 -> 50} |
但是,我收到这样的错误:
分组/连接/窗口分区键不能是map类型。
java.lang.IllegalStateException:分组/加入/窗口分区键不能是映射类型。
那是因为列特征是一个地图。
还有其他方法可以进行此分组吗?
我正在使用 Spark 版本 3.2.2 和 SQLContext,以及 scala 语言。
使用
map
函数将 string
类型转换为某种 to_json
类型,并将其传递到 Window.partitionBy
函数中。检查下面的示例解决方案。
scala> mdf.show(false)
+-----+------------------+------+-----------------------+--------------------+
|brand|original_timestamp|weight|arrival_timestamp |features |
+-----+------------------+------+-----------------------+--------------------+
|BR1 |1632899456 |4.0 |2023-08-09 17:12:24.002|{f1 -> 12, f2 -> 52}|
|BR1 |1632899456 |4.0 |2023-08-09 17:14:24.002|{f1 -> 42, f2 -> 12}|
|BR1 |1632899456 |2.0 |2023-08-09 17:46:24.002|{f1 -> 42, f2 -> 12}|
|BR3 |1632899155 |2.0 |2023-08-09 17:14:24.002|{f1 -> 72, f2 -> 50}|
|BR3 |1632899155 |9.0 |2023-08-09 17:20:24.002|{f1 -> 72, f2 -> 50}|
+-----+------------------+------+-----------------------+--------------------+
scala> import org.apache.spark.sql.expressions._
import org.apache.spark.sql.expressions._
scala> val windowSpec = Window.partitionBy($"brand",$"original_timestamp",to_json($"features").as("features")).orderBy(col("arrival_timestamp").desc)
windowSpec: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@10b34165
scala> mdf.withColumn("maxTs", first($"arrival_timestamp").over(windowSpec)).where($"maxTs" === $"arrival_timestamp").drop("maxTs").show(false)
+-----+------------------+------+-----------------------+--------------------+
|brand|original_timestamp|weight|arrival_timestamp |features |
+-----+------------------+------+-----------------------+--------------------+
|BR1 |1632899456 |4.0 |2023-08-09 17:12:24.002|{f1 -> 12, f2 -> 52}|
|BR1 |1632899456 |2.0 |2023-08-09 17:46:24.002|{f1 -> 42, f2 -> 12}|
|BR3 |1632899155 |9.0 |2023-08-09 17:20:24.002|{f1 -> 72, f2 -> 50}|
+-----+------------------+------+-----------------------+--------------------+