我有这个数据框:
+------+-------------------+-----------+------------------------+------------------------+
|brand |original_timestamp |weight |arrival_timestamp |features |
+------+-------------------+-----------+------------------------+------------------------+
|BR1 |1632899456 |4.0 |2023-08-09 17:12:24.002 |{f1 -> 12, f2 -> 52} |
|BR1 |1632899456 |4.0 |2023-08-09 17:14:24.002 |{f1 -> 42, f2 -> 12} |
|BR1 |1632899456 |2.0 |2023-08-09 17:46:24.002 |{f1 -> 42, f2 -> 12} |
|BR3 |1632899155 |2.0 |2023-08-09 17:14:24.002 |{f1 -> 72, f2 -> 50} |
|BR3 |1632899155 |9.0 |2023-08-09 17:20:24.002 |{f1 -> 72, f2 -> 50} |
我想获取以下每种组合的最新“arrival_timestamp”: 品牌、原始时间戳和功能。 这是我正在使用的代码:
import org.apache.spark.sql.functions._
val windowSpec = Window.partitionBy("brand","original_timestamp","features").orderBy(col("arrival_timestamp").desc)
df.withColumn("maxTS", first("arrival_timestamp").over(windowSpec))
.select("*").where(col("maxTS") === col("arrival_timestamp"))
.drop("maxTS")
.show(false)
这是我期待的输出:
+------+-------------------+-----------+------------------------+------------------------+
|brand |original_timestamp |weight |arrival_timestamp |features |
+------+-------------------+-----------+------------------------+------------------------+
|BR1 |1632899456 |4.0 |2023-08-09 17:12:24.002 |{f1 -> 12, f2 -> 52} |
|BR1 |1632899456 |2.0 |2023-08-09 17:46:24.002 |{f1 -> 42, f2 -> 12} |
|BR3 |1632899155 |9.0 |2023-08-09 17:20:24.002 |{f1 -> 72, f2 -> 50} |
但是,我收到这样的错误:
分组/连接/窗口分区键不能是map类型。
java.lang.IllegalStateException:分组/加入/窗口分区键不能是映射类型。
那是因为列特征是一个地图。
还有其他方法可以进行此分组吗?
我正在使用 Spark 版本 3.2.2 和 SQLContext,以及 scala 语言。
编辑:这是在 SPARK 结构化流的背景下
使用
map
函数将 string
类型转换为某种 to_json
类型,并将其传递到 Window.partitionBy
函数中。检查下面的示例解决方案。
scala> mdf.show(false)
+-----+------------------+------+-----------------------+--------------------+
|brand|original_timestamp|weight|arrival_timestamp |features |
+-----+------------------+------+-----------------------+--------------------+
|BR1 |1632899456 |4.0 |2023-08-09 17:12:24.002|{f1 -> 12, f2 -> 52}|
|BR1 |1632899456 |4.0 |2023-08-09 17:14:24.002|{f1 -> 42, f2 -> 12}|
|BR1 |1632899456 |2.0 |2023-08-09 17:46:24.002|{f1 -> 42, f2 -> 12}|
|BR3 |1632899155 |2.0 |2023-08-09 17:14:24.002|{f1 -> 72, f2 -> 50}|
|BR3 |1632899155 |9.0 |2023-08-09 17:20:24.002|{f1 -> 72, f2 -> 50}|
+-----+------------------+------+-----------------------+--------------------+
scala> import org.apache.spark.sql.expressions._
import org.apache.spark.sql.expressions._
scala> val windowSpec = Window.partitionBy($"brand",$"original_timestamp",to_json($"features").as("features")).orderBy(col("arrival_timestamp").desc)
windowSpec: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@10b34165
scala> mdf.withColumn("maxTs", first($"arrival_timestamp").over(windowSpec)).where($"maxTs" === $"arrival_timestamp").drop("maxTs").show(false)
+-----+------------------+------+-----------------------+--------------------+
|brand|original_timestamp|weight|arrival_timestamp |features |
+-----+------------------+------+-----------------------+--------------------+
|BR1 |1632899456 |4.0 |2023-08-09 17:12:24.002|{f1 -> 12, f2 -> 52}|
|BR1 |1632899456 |2.0 |2023-08-09 17:46:24.002|{f1 -> 42, f2 -> 12}|
|BR3 |1632899155 |9.0 |2023-08-09 17:20:24.002|{f1 -> 72, f2 -> 50}|
+-----+------------------+------+-----------------------+--------------------+
请注意,to_json 有两个显着的功能缺陷:
它还会对性能产生影响,尽管在这种情况下不太明显,因为您可能会忽略之后的结果。
为什么关键顺序很重要?
spark.sql("select map('b', 'b', 'a', 'a') amap, map('a', 'a', 'b', 'b') bmap").
selectExpr("*", "to_json(amap) jamap", "to_json(bmap) jbmap").
selectExpr("*", "(jamap = jbmap) same").
show
产量:
+----------------+----------------+-----------------+-----------------+-----+
| amap| bmap| jamap| jbmap| same|
+----------------+----------------+-----------------+-----------------+-----+
|{b -> b, a -> a}|{a -> a, b -> b}|{"b":"b","a":"a"}|{"a":"a","b":"b"}|false|
+----------------+----------------+-----------------+-----------------+-----+
虽然地图是相同的,但上面的相同列是 false,因为 json 表示形式不同。
在您的情况下,首先通过键排序然后是 to_json 似乎就足够了,但是如果存在映射嵌套或者您将字符串键交换为其他类型,您将需要另一个解决方案。
为了向地图添加通用比较、分组依据等,无论嵌套有多深,我在 Quality 中添加了comparable_maps。一个已知的限制是类型内的间隔也是不可比较的(并且在不同的 Spark 版本上有不同的表示)。
com.sparkutils.quality.registerQualityFunctions()
sparkSession.sql("select map('b', 'b', 'a', 'a') amap, map('a', 'a', 'b', 'b') bmap").
selectExpr("*", "comparable_maps(amap) camap", "comparable_maps(bmap) cbmap").
selectExpr("*", "(camap = cbmap) same").
show
产量:
+----------------+----------------+----------------+----------------+----+
| amap| bmap| camap| cbmap|same|
+----------------+----------------+----------------+----------------+----+
|{b -> b, a -> a}|{a -> a, b -> b}|[{a, a}, {b, b}]|[{a, a}, {b, b}]|true|
+----------------+----------------+----------------+----------------+----+