如何在 pyspark 结构化流中返回每组的最新行

问题描述 投票:0回答:3

我有一个使用

spark.readStream.format('delta')
在 pyspark 中读取的流。数据由多列组成,包括
type
date
value
列。

示例数据框;

类型 日期 价值
1 2020-01-21 6
1 2020-01-16 5
2 2020-01-20 8
2 2020-01-15 4

我想创建一个 DataFrame 来跟踪每种类型的最新

state
。处理静态(批处理)数据时最简单的方法之一是使用窗口,但不支持在非时间戳列上使用窗口。另一种选择看起来像

stream.groupby('type').agg(last('date'), last('value')).writeStream

但我认为 Spark 无法保证此处的顺序,并且在聚合之前的结构化流中也不支持使用

orderBy

您对如何应对这一挑战有什么建议吗?

apache-spark pyspark spark-structured-streaming
3个回答
0
投票

简单使用可以通过

from pyspark.sql.functions import *
导入的 to_timestamp() 函数 日期列上,以便您使用窗口函数。 例如

from pyspark.sql.functions import *

df=spark.createDataFrame(
        data = [ ("1","2020-01-21")],
        schema=["id","input_timestamp"])
df.printSchema()

+---+---------------+-------------------+
|id |input_timestamp|timestamp          |
+---+---------------+-------------------+
|1  |2020-01-21     |2020-01-21 00:00:00|
+---+---------------+-------------------+

0
投票

“但不支持在非时间戳列上使用窗口” 你是从流的角度这么说的吗?因为我也能做到。

这是您问题的解决方案。

windowSpec  = Window.partitionBy("type").orderBy("date")
df1=df.withColumn("rank",rank().over(windowSpec))
df1.show()

+----+----------+-----+----+
|type|      date|value|rank|
+----+----------+-----+----+
|   1|2020-01-16|    5|   1|
|   1|2020-01-21|    6|   2|
|   2|2020-01-15|    4|   1|
|   2|2020-01-20|    8|   2|
+----+----------+-----+----+

w = Window.partitionBy('type')
df1.withColumn('maxB', F.max('rank').over(w)).where(F.col('rank') == F.col('maxB')).drop('maxB').show()

+----+----------+-----+----+
|type|      date|value|rank|
+----+----------+-----+----+
|   1|2020-01-21|    6|   2|
|   2|2020-01-20|    8|   2|
+----+----------+-----+----+

0
投票

尝试使用 pyspark 窗口函数和分区最大日期。

流处理可能会拒绝聚合。尝试对每个批次应用上述方法。

© www.soinside.com 2019 - 2024. All rights reserved.