我每隔 1 分钟就会将多个传感器的数据流接收到数据块中。如果传感器“ABC”和“DEF”可用于每个 pyspark 流负载,并且传感器的值应为 ABC 和 DEF 值的平均值,则需要创建新的传感器名称“PQRS”。
Input data(1 min stream)
传感器名称 | 价值 | 时间戳 |
---|---|---|
ABC | 10 | 2023-11-02T11:49:32.028Z |
防御 | 20 | 2023-11-02T11:49:32.028Z |
GHI | 12 | 2023-11-02T11:49:32.028Z |
输出数据
传感器名称 | 价值 | 时间戳 |
---|---|---|
ABC | 10 | 2023-11-02T11:49:32.028Z |
防御 | 20 | 2023-11-02T11:49:32.028Z |
PQRS | 15 | 2023-11-02T11:49:32.028Z |
GHI | 12 | 2023-11-02T11:49:32.028Z |
将流数据接收到数据帧后,使用以下代码来实现您的要求。
from pyspark.sql.functions import *
#Find the average with required condition
temp_df = input_stream.filter((input_stream.sensor_name == "ABC") | (input_stream.sensor_name == "DEF")).groupBy("timestamp").agg(avg("value").alias("avg_value"))
#new dataframe with sensor pqrs data and average
pqrs_df = temp_df.withColumn("sensor_name", lit("PQRS"))
pqrs_df.show()
#Union with your stream dataframe
res_df = input_stream.union(pqrs_df.select("sensor_name", "avg_value", "timestamp"))
#Write this dataframe to your streaming sink
res_df.display()
这里,这将过滤具有
ABC
和 DEF
为 sensor_name
的行,并计算 value
的平均值。
然后,创建一个新数据帧以添加
PQRS
传感器行,并将其与流数据帧合并,并将结果存储到另一个数据帧中。您可以根据您的要求将此数据框写入您的接收器。
上述代码的输出:
| sensor_name | value | timestamp |
|-------------|-------|--------------------------|
| ABC | 10 | 2023-11-02T11:49:32.028Z |
| DEF | 20 | 2023-11-02T11:49:32.028Z |
| GHI | 12 | 2023-11-02T11:49:32.028Z |
| PQRS | 15 | 2023-11-02T11:49:32.028Z |