当 kafka 主题摄取的日志数量为零时,用于检测异常的 Flink 程序不会给出任何输出

问题描述 投票:0回答:1

我正在编写一个 pyflink 程序,用于使用指数加权移动平均值对进入 kafka 主题的日志数量进行异常检测。主题所在的 kafka 代理正在我的本地计算机中运行

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowDeserializationSchema
from pyflink.datastream.window import TumblingProcessingTimeWindows
from pyflink.datastream.functions import AggregateFunction, MapFunction
from pyflink.common.time import Time
from datetime import datetime, timedelta

class SimpleCountAggregateFunction(AggregateFunction):
    def create_accumulator(self):
        return (0, None)  # Include a placeholder for timestamp

    def add(self, value, accumulator):
        # Assume value is a tuple of (count, timestamp)
        count, timestamp = value
        if count == 0:
            timestamp = datetime.now()  # Update timestamp for zero count window
        return (accumulator[0] + 1, timestamp)

    def get_result(self, accumulator):
        return accumulator

    def merge(self, a, b):
        return (a[0] + b[0], a[1])

class EMACalculatorWithPrediction(MapFunction):
    def __init__(self, period, start_time):
        self.period = period
        self.smoothing_factor = 2 / (1 + period)
        self.ema = None
        # Convert start_time from datetime to timestamp for simpler handling
        self.window_start_time = start_time

    def map(self, value):
        count, _ = value  # Unpack the count, ignore the incoming timestamp
        original_value = float(count)
        if self.ema is None:
            self.ema = original_value
        else:
            self.ema = (original_value * self.smoothing_factor) + (self.ema * (1 - self.smoothing_factor))

        current_time = self.window_start_time.strftime("%Y-%m-%d %H:%M:%S")
        # Update start_time for the next window
        self.window_start_time += timedelta(seconds=60)

        # Check for anomaly
        if original_value > 0 and ((original_value - self.ema) / original_value) > 0.3:
            # Report anomaly
            return (current_time, original_value, self.ema, "Anomaly Detected")
        else:
            return (current_time, original_value, self.ema, "No Anomaly")

def main():
    env = StreamExecutionEnvironment.get_execution_environment()

    kafka_source = FlinkKafkaConsumer(
        topics="input-events",
        deserialization_schema=JsonRowDeserializationSchema.builder().type_info(
            type_info=Types.ROW_NAMED(["timestamp", "data"], [Types.INSTANT(), Types.STRING()])).build(),
        properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'count-group'}
    )

    data_stream = env.add_source(kafka_source)

    # Convert to a tuple of (count, timestamp) before windowing
    mapped_stream = data_stream.map(lambda x: (1, x[0]), output_type=Types.TUPLE([Types.LONG(), Types.INSTANT()]))

    counted_stream = mapped_stream \
        .window_all(TumblingProcessingTimeWindows.of(Time.seconds(60))) \
        .aggregate(SimpleCountAggregateFunction(), output_type=Types.TUPLE([Types.LONG(), Types.INSTANT()])) \
        .map(lambda count_timestamp: (str(count_timestamp[0]), count_timestamp[1]), output_type=Types.TUPLE([Types.STRING(), Types.INSTANT()]))

    # Initialize EMACalculatorWithPrediction with the current system time as the start time for the first window
    initial_window_start_time = datetime.now()
    ema_and_value_stream = counted_stream.map(EMACalculatorWithPrediction(period=10, start_time=initial_window_start_time), 
                                              output_type=Types.TUPLE([Types.STRING(), Types.FLOAT(), Types.FLOAT(), Types.STRING()]))

    
    ema_and_value_stream.map(lambda x: f"Timestamp: {x[0]}, Original: {x[1]}, EWMA: {x[2]}, Status: {x[3]}", output_type=Types.STRING()).print()

    env.execute("Kafka JSON Count, EWMA Calculation, and Anomaly Detection")

if __name__ == "__main__":
    main()

代码使用滚动窗口计算进入 kafka 主题的日志数量,代码的 ewma(指数加权移动平均值)和异常检测部分工作正常,但问题出现在代码的计数部分。计数当有日志进入主题时,滚动窗口的工作原理,但是,当日志数量为零时,计数不起作用,我想修改代码,使其在计数为零时工作

我为解决这个问题所做的事情是 •编写 if else 条件并将其与 0 进行比较 •改变地图函数的计算方式

任何其他帮助我修复逻辑的建议将不胜感激,我在过去的 6 个小时里一直被困在这里

apache-flink flink-streaming pyflink
1个回答
0
投票

当您使用 DataStream API 时,在将第一条记录分配给窗口之前,窗口并不存在,因此空窗口没有机会生成结果。

我的建议是使用 KeyedProcessFunction 而不是窗口来重写它。我建议使用 KeyedProcessFuntion 而不是 ProcessFunction,因为您需要保留一些状态(计数器),并且您需要一个计时器,而计时器仅适用于键控流。由于对流进行键分区没有意义,因此您可以使用返回常量的键选择器函数 - 即为每个事件分配相同的键。

按键处理功能应该非常简单:

  • 在open()方法中,设置状态
  • 在processElement()方法中,如果计数器为空,则将计数器初始化为1,并注册一个定时器;否则,增加计数器
  • 在 onTimer() 方法上,发出结果,将计数器重置为 0,并注册一个新计时器
© www.soinside.com 2019 - 2024. All rights reserved.