检查所有键是否都正确接收了流

问题描述 投票:0回答:3

我有以下情形:假设有20个传感器向我发送流式提要。我对流应用keyBy(sensorID),并执行一些操作,例如平均值等。此操作已实现且运行良好(使用Flink Java API)。

最初一切顺利,所有传感器都向我发送提要。一段时间后,可能会发生几个传感器工作异常的情况,我开始从中获取不规则的进给,例如我收到来自18个传感器的提要,但有2个长时间不发送给我。

我们可以假设我已经知道了sensorId的固定列表(可能是硬编码的/或在数据库中)。如何确定哪两个没有发送Feed?在哪里可以获取keyId的列表以与数据库中的列表进行比较?

[我想在没有提要的情况下发出警报(例如2分钟,5分钟,10分钟等,优先级提高)。

有人使用flink-streaming / patterns实现了这种情况吗?有任何建议请。

apache-flink flink-streaming
3个回答
0
投票

您可以从技术上使用ProcessFunction和计时器。

您可以简单地为每个记录注册计时器,并在收到数据时将其重置。如果您将计时器安排为在5分钟的处理时间后运行,则这基本上意味着,如果您尚未收到数据,它将调用函数onTimer,您可以从中发出一些警报。可以为已经触发的警报重新注册计时器,以允许发出更高严重性的警报。

请注意,这仅在最初假设所有传感器都正常工作的情况下才有效。具体来说,它将仅针对至少被查看过一次的键发出警报。但是从您的描述看来,它可以解决您的问题。


0
投票

我只是碰巧有一个这种模式的例子。需要进行一些调整以适合您的用例,但应该可以开始使用。

public class TimeoutFunction extends KeyedProcessFunction<String, Event, String> {

    private ValueState<Long> lastModifiedState;
    static final int TIMEOUT = 2 * 60 * 1000; // 2 minutes

    @Override
    public void open(Configuration parameters) throws Exception {

        // register our state with the state backend
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Long.class));
    }

    @Override
    public void processElement(Event event, Context ctx, Collector<String> out) throws Exception {

        // update our state and timer
        Long current = lastModifiedState.value();
        if (current != null) {
            ctx.timerService().deleteEventTimeTimer(current + TIMEOUT);
        }
        current = max(current, event.timestamp());
        lastModifiedState.update(current);
        ctx.timerService().registerEventTimeTimer(current + TIMEOUT);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {

        // emit alert
        String deviceId = ctx.getCurrentKey();
        out.collect(deviceId);
    }
}

这假定一个主程序执行如下操作:

DataStream<String> result = stream
    .assignTimestampsAndWatermarks(new MyBoundedOutOfOrdernessAssigner(...))
    .keyBy(e -> e.deviceId)
    .process(new TimeoutFunction());

如@Dominik所说,这只会针对至少被查看过一次的密钥发出警报。您可以通过引入辅助事件源来解决此问题,该辅助事件源为应该存在的每个源创建一个人工事件,并将该流与主要源合并。


0
投票

现在我很清楚这种模式。我已经实施了该解决方案,它的工作原理就像魅力一样。

[如果有人需要代码,那么我很乐意分享

© www.soinside.com 2019 - 2024. All rights reserved.