我正在编写一个 pyflink 程序,用于使用指数加权移动平均值对进入 kafka 主题的日志数量进行异常检测。主题所在的 kafka 代理正在我的本地计算机中运行
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowDeserializationSchema
from pyflink.datastream.window import TumblingProcessingTimeWindows
from pyflink.datastream.functions import AggregateFunction, MapFunction
from pyflink.common.time import Time
from datetime import datetime, timedelta
class SimpleCountAggregateFunction(AggregateFunction):
def create_accumulator(self):
return (0, None) # Include a placeholder for timestamp
def add(self, value, accumulator):
# Assume value is a tuple of (count, timestamp)
count, timestamp = value
if count == 0:
timestamp = datetime.now() # Update timestamp for zero count window
return (accumulator[0] + 1, timestamp)
def get_result(self, accumulator):
return accumulator
def merge(self, a, b):
return (a[0] + b[0], a[1])
class EMACalculatorWithPrediction(MapFunction):
def __init__(self, period, start_time):
self.period = period
self.smoothing_factor = 2 / (1 + period)
self.ema = None
# Convert start_time from datetime to timestamp for simpler handling
self.window_start_time = start_time
def map(self, value):
count, _ = value # Unpack the count, ignore the incoming timestamp
original_value = float(count)
if self.ema is None:
self.ema = original_value
else:
self.ema = (original_value * self.smoothing_factor) + (self.ema * (1 - self.smoothing_factor))
current_time = self.window_start_time.strftime("%Y-%m-%d %H:%M:%S")
# Update start_time for the next window
self.window_start_time += timedelta(seconds=60)
# Check for anomaly
if original_value > 0 and ((original_value - self.ema) / original_value) > 0.3:
# Report anomaly
return (current_time, original_value, self.ema, "Anomaly Detected")
else:
return (current_time, original_value, self.ema, "No Anomaly")
def main():
env = StreamExecutionEnvironment.get_execution_environment()
kafka_source = FlinkKafkaConsumer(
topics="input-events",
deserialization_schema=JsonRowDeserializationSchema.builder().type_info(
type_info=Types.ROW_NAMED(["timestamp", "data"], [Types.INSTANT(), Types.STRING()])).build(),
properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'count-group'}
)
data_stream = env.add_source(kafka_source)
# Convert to a tuple of (count, timestamp) before windowing
mapped_stream = data_stream.map(lambda x: (1, x[0]), output_type=Types.TUPLE([Types.LONG(), Types.INSTANT()]))
counted_stream = mapped_stream \
.window_all(TumblingProcessingTimeWindows.of(Time.seconds(60))) \
.aggregate(SimpleCountAggregateFunction(), output_type=Types.TUPLE([Types.LONG(), Types.INSTANT()])) \
.map(lambda count_timestamp: (str(count_timestamp[0]), count_timestamp[1]), output_type=Types.TUPLE([Types.STRING(), Types.INSTANT()]))
# Initialize EMACalculatorWithPrediction with the current system time as the start time for the first window
initial_window_start_time = datetime.now()
ema_and_value_stream = counted_stream.map(EMACalculatorWithPrediction(period=10, start_time=initial_window_start_time),
output_type=Types.TUPLE([Types.STRING(), Types.FLOAT(), Types.FLOAT(), Types.STRING()]))
ema_and_value_stream.map(lambda x: f"Timestamp: {x[0]}, Original: {x[1]}, EWMA: {x[2]}, Status: {x[3]}", output_type=Types.STRING()).print()
env.execute("Kafka JSON Count, EWMA Calculation, and Anomaly Detection")
if __name__ == "__main__":
main()
代码使用滚动窗口计算进入 kafka 主题的日志数量,代码的 ewma(指数加权移动平均值)和异常检测部分工作正常,但问题出现在代码的计数部分。计数当有日志进入主题时,滚动窗口的工作原理,但是,当日志数量为零时,计数不起作用,我想修改代码,使其在计数为零时工作
我为解决这个问题所做的事情是 •编写 if else 条件并将其与 0 进行比较 •改变地图函数的计算方式
任何其他帮助我修复逻辑的建议将不胜感激,我在过去的 6 个小时里一直被困在这里
当您使用 DataStream API 时,在将第一条记录分配给窗口之前,窗口并不存在,因此空窗口没有机会生成结果。
我的建议是使用 KeyedProcessFunction 而不是窗口来重写它。我建议使用 KeyedProcessFuntion 而不是 ProcessFunction,因为您需要保留一些状态(计数器),并且您需要一个计时器,而计时器仅适用于键控流。由于对流进行键分区没有意义,因此您可以使用返回常量的键选择器函数 - 即为每个事件分配相同的键。
按键处理功能应该非常简单: