获取具有最新时间戳的数据帧行(Spark 结构化流)

问题描述 投票:0回答:2

我有这个数据框:

+------+-------------------+-----------+------------------------+------------------------+
|brand |original_timestamp |weight     |arrival_timestamp       |features                |
+------+-------------------+-----------+------------------------+------------------------+
|BR1   |1632899456         |4.0        |2023-08-09 17:12:24.002 |{f1 -> 12, f2 -> 52}    |
|BR1   |1632899456         |4.0        |2023-08-09 17:14:24.002 |{f1 -> 42, f2 -> 12}    |
|BR1   |1632899456         |2.0        |2023-08-09 17:46:24.002 |{f1 -> 42, f2 -> 12}    |
|BR3   |1632899155         |2.0        |2023-08-09 17:14:24.002 |{f1 -> 72, f2 -> 50}    |
|BR3   |1632899155         |9.0        |2023-08-09 17:20:24.002 |{f1 -> 72, f2 -> 50}    |

我想获取以下每种组合的最新“arrival_timestamp”: 品牌、原始时间戳和功能。 这是我正在使用的代码:

import org.apache.spark.sql.functions._
val windowSpec = Window.partitionBy("brand","original_timestamp","features").orderBy(col("arrival_timestamp").desc)

df.withColumn("maxTS", first("arrival_timestamp").over(windowSpec))
  .select("*").where(col("maxTS") === col("arrival_timestamp"))
  .drop("maxTS")
  .show(false)

这是我期待的输出:

+------+-------------------+-----------+------------------------+------------------------+
|brand |original_timestamp |weight     |arrival_timestamp       |features                |
+------+-------------------+-----------+------------------------+------------------------+
|BR1   |1632899456         |4.0        |2023-08-09 17:12:24.002 |{f1 -> 12, f2 -> 52}    |

|BR1   |1632899456         |2.0        |2023-08-09 17:46:24.002 |{f1 -> 42, f2 -> 12}    |

|BR3   |1632899155         |9.0        |2023-08-09 17:20:24.002 |{f1 -> 72, f2 -> 50}    |

但是,我收到这样的错误:

分组/连接/窗口分区键不能是map类型。

java.lang.IllegalStateException:分组/加入/窗口分区键不能是映射类型。

那是因为列特征是一个地图。

还有其他方法可以进行此分组吗?

我正在使用 Spark 版本 3.2.2 和 SQLContext,以及 scala 语言。

编辑这是在 SPARK 结构化流的背景下

dataframe scala apache-spark apache-spark-sql spark-structured-streaming
2个回答
1
投票

使用

map
函数将
string
类型转换为某种
to_json
类型,并将其传递到
Window.partitionBy
函数中。检查下面的示例解决方案。

scala> mdf.show(false)
+-----+------------------+------+-----------------------+--------------------+
|brand|original_timestamp|weight|arrival_timestamp      |features            |
+-----+------------------+------+-----------------------+--------------------+
|BR1  |1632899456        |4.0   |2023-08-09 17:12:24.002|{f1 -> 12, f2 -> 52}|
|BR1  |1632899456        |4.0   |2023-08-09 17:14:24.002|{f1 -> 42, f2 -> 12}|
|BR1  |1632899456        |2.0   |2023-08-09 17:46:24.002|{f1 -> 42, f2 -> 12}|
|BR3  |1632899155        |2.0   |2023-08-09 17:14:24.002|{f1 -> 72, f2 -> 50}|
|BR3  |1632899155        |9.0   |2023-08-09 17:20:24.002|{f1 -> 72, f2 -> 50}|
+-----+------------------+------+-----------------------+--------------------+
scala> import org.apache.spark.sql.expressions._
import org.apache.spark.sql.expressions._

scala> val windowSpec = Window.partitionBy($"brand",$"original_timestamp",to_json($"features").as("features")).orderBy(col("arrival_timestamp").desc)
windowSpec: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@10b34165

scala> mdf.withColumn("maxTs", first($"arrival_timestamp").over(windowSpec)).where($"maxTs" === $"arrival_timestamp").drop("maxTs").show(false)
+-----+------------------+------+-----------------------+--------------------+
|brand|original_timestamp|weight|arrival_timestamp      |features            |
+-----+------------------+------+-----------------------+--------------------+
|BR1  |1632899456        |4.0   |2023-08-09 17:12:24.002|{f1 -> 12, f2 -> 52}|
|BR1  |1632899456        |2.0   |2023-08-09 17:46:24.002|{f1 -> 42, f2 -> 12}|
|BR3  |1632899155        |9.0   |2023-08-09 17:20:24.002|{f1 -> 72, f2 -> 50}|
+-----+------------------+------+-----------------------+--------------------+

1
投票

请注意,to_json 有两个显着的功能缺陷:

  1. 它不会订购钥匙(没有选项可以这样做)
  2. 它只支持字符串键(json限制,但似乎与这个问题无关)

它还会对性能产生影响,尽管在这种情况下不太明显,因为您可能会忽略之后的结果。

为什么关键顺序很重要?

spark.sql("select map('b', 'b', 'a', 'a') amap, map('a', 'a', 'b', 'b') bmap").
      selectExpr("*", "to_json(amap) jamap", "to_json(bmap) jbmap").
      selectExpr("*", "(jamap = jbmap) same").
      show

产量:

+----------------+----------------+-----------------+-----------------+-----+
|            amap|            bmap|            jamap|            jbmap| same|
+----------------+----------------+-----------------+-----------------+-----+
|{b -> b, a -> a}|{a -> a, b -> b}|{"b":"b","a":"a"}|{"a":"a","b":"b"}|false|
+----------------+----------------+-----------------+-----------------+-----+

虽然地图是相同的,但上面的相同列是 false,因为 json 表示形式不同。

在您的情况下,首先通过键排序然后是 to_json 似乎就足够了,但是如果存在映射嵌套或者您将字符串键交换为其他类型,您将需要另一个解决方案。

为了向地图添加通用比较、分组依据等,无论嵌套有多深,我在 Quality 中添加了

comparable_maps。一个已知的限制是类型内的间隔也是不可比较的(并且在不同的 Spark 版本上有不同的表示)。

com.sparkutils.quality.registerQualityFunctions() sparkSession.sql("select map('b', 'b', 'a', 'a') amap, map('a', 'a', 'b', 'b') bmap"). selectExpr("*", "comparable_maps(amap) camap", "comparable_maps(bmap) cbmap"). selectExpr("*", "(camap = cbmap) same"). show
产量:

+----------------+----------------+----------------+----------------+----+ | amap| bmap| camap| cbmap|same| +----------------+----------------+----------------+----------------+----+ |{b -> b, a -> a}|{a -> a, b -> b}|[{a, a}, {b, b}]|[{a, a}, {b, b}]|true| +----------------+----------------+----------------+----------------+----+
    
© www.soinside.com 2019 - 2024. All rights reserved.