我有一个使用
spark.readStream.format('delta')
在 pyspark 中读取的流。数据由多列组成,包括 type
、date
和 value
列。
示例数据框;
类型 | 日期 | 价值 |
---|---|---|
1 | 2020-01-21 | 6 |
1 | 2020-01-16 | 5 |
2 | 2020-01-20 | 8 |
2 | 2020-01-15 | 4 |
我想创建一个 DataFrame 来跟踪每种类型的最新
state
。处理静态(批处理)数据时最简单的方法之一是使用窗口,但不支持在非时间戳列上使用窗口。另一种选择看起来像
stream.groupby('type').agg(last('date'), last('value')).writeStream
但我认为 Spark 无法保证此处的顺序,并且在聚合之前的结构化流中也不支持使用
orderBy
。
您对如何应对这一挑战有什么建议吗?
简单使用可以通过
from pyspark.sql.functions import *
导入的 to_timestamp() 函数
日期列上,以便您使用窗口函数。
例如
from pyspark.sql.functions import *
df=spark.createDataFrame(
data = [ ("1","2020-01-21")],
schema=["id","input_timestamp"])
df.printSchema()
+---+---------------+-------------------+
|id |input_timestamp|timestamp |
+---+---------------+-------------------+
|1 |2020-01-21 |2020-01-21 00:00:00|
+---+---------------+-------------------+
“但不支持在非时间戳列上使用窗口” 你是从流的角度这么说的吗?因为我也能做到。
这是您问题的解决方案。
windowSpec = Window.partitionBy("type").orderBy("date")
df1=df.withColumn("rank",rank().over(windowSpec))
df1.show()
+----+----------+-----+----+
|type| date|value|rank|
+----+----------+-----+----+
| 1|2020-01-16| 5| 1|
| 1|2020-01-21| 6| 2|
| 2|2020-01-15| 4| 1|
| 2|2020-01-20| 8| 2|
+----+----------+-----+----+
w = Window.partitionBy('type')
df1.withColumn('maxB', F.max('rank').over(w)).where(F.col('rank') == F.col('maxB')).drop('maxB').show()
+----+----------+-----+----+
|type| date|value|rank|
+----+----------+-----+----+
| 1|2020-01-21| 6| 2|
| 2|2020-01-20| 8| 2|
+----+----------+-----+----+
尝试使用 pyspark 窗口函数和分区最大日期。
流处理可能会拒绝聚合。尝试对每个批次应用上述方法。