需要在pysaprk流作业中执行聚合操作

问题描述 投票:0回答:1

我每隔 1 分钟就会将多个传感器的数据流接收到数据块中。如果传感器“ABC”和“DEF”可用于每个 pyspark 流负载,并且传感器的值应为 ABC 和 DEF 值的平均值,则需要创建新的传感器名称“PQRS”。

Input data(1 min stream)

传感器名称 价值 时间戳
ABC 10 2023-11-02T11:49:32.028Z
防御 20 2023-11-02T11:49:32.028Z
GHI 12 2023-11-02T11:49:32.028Z

输出数据

传感器名称 价值 时间戳
ABC 10 2023-11-02T11:49:32.028Z
防御 20 2023-11-02T11:49:32.028Z
PQRS 15 2023-11-02T11:49:32.028Z
GHI 12 2023-11-02T11:49:32.028Z
pyspark databricks spark-streaming azure-iot-hub
1个回答
0
投票

将流数据接收到数据帧后,使用以下代码来实现您的要求。

from pyspark.sql.functions import *

#Find the average with required condition
temp_df = input_stream.filter((input_stream.sensor_name == "ABC") | (input_stream.sensor_name == "DEF")).groupBy("timestamp").agg(avg("value").alias("avg_value"))

#new dataframe with sensor pqrs data and average
pqrs_df = temp_df.withColumn("sensor_name", lit("PQRS"))
pqrs_df.show()

#Union with your stream dataframe
res_df = input_stream.union(pqrs_df.select("sensor_name", "avg_value", "timestamp"))

#Write this dataframe to your streaming sink
res_df.display()

这里,这将过滤具有

ABC
DEF
sensor_name
的行,并计算
value
的平均值。

然后,创建一个新数据帧以添加

PQRS
传感器行,并将其与流数据帧合并,并将结果存储到另一个数据帧中。您可以根据您的要求将此数据框写入您的接收器。

上述代码的输出:

| sensor_name | value | timestamp                |
|-------------|-------|--------------------------|
| ABC         | 10    | 2023-11-02T11:49:32.028Z |
| DEF         | 20    | 2023-11-02T11:49:32.028Z |
| GHI         | 12    | 2023-11-02T11:49:32.028Z |
| PQRS        | 15    | 2023-11-02T11:49:32.028Z |
© www.soinside.com 2019 - 2024. All rights reserved.