Flink - timesOrMore的行为

问题描述 投票:0回答:1

我想找到随后发生的事件模式

内在模式是:

  1. 键“sensorArea”具有相同的值。
  2. 对于“customerId”键有不同的价值。
  3. 彼此在5秒内。

这种模式需要

  1. 仅在先前发生3次或更多次时发出“警报”。

我写了一些东西,但我知道它确实不完整。

两个问题

  1. 当我处于“下一个”模式时,我需要访问前面的事件字段,如何在不使用ctx命令的情况下执行此操作,因为它很重...
  2. 我的代码带来了奇怪的结果 - 这是我的输入

enter image description here

我的输出是

3> {first=[Customer[timestamp=50,customerId=111,toAdd=2,sensorData=33]], second=[Customer[timestamp=100,customerId=222,toAdd=2,sensorData=33], Customer[timestamp=600,customerId=333,toAdd=2,sensorData=33]]}

即使我想要的输出应该是所有前六个事件(用户111/222和传感器是33然后44然后55

Pattern<Customer, ?> sameUserDifferentSensor = Pattern.<Customer>begin("first", skipStrategy)
            .followedBy("second").where(new IterativeCondition<Customer>() {
                @Override
                public boolean filter(Customer currCustomerEvent, Context<Customer> ctx) throws Exception {
                    List<Customer> firstPatternEvents = Lists.newArrayList(ctx.getEventsForPattern("first"));
                    int i = firstPatternEvents.size();
                    int currSensorData = currCustomerEvent.getSensorData();
                    int prevSensorData = firstPatternEvents.get(i-1).getSensorData();
                    int currCustomerId = currCustomerEvent.getCustomerId();
                    int prevCustomerId = firstPatternEvents.get(i-1).getCustomerId();
                    return currSensorData==prevSensorData && currCustomerId!=prevCustomerId;
                }
            })
            .within(Time.seconds(5))
            .timesOrMore(3);



    PatternStream<Customer> sameUserDifferentSensorPatternStream = CEP.pattern(customerStream, sameUserDifferentSensor);
    DataStream<String> alerts1 = sameUserDifferentSensorPatternStream.select((PatternSelectFunction<Customer, String>) Object::toString);
apache-flink flink-streaming cep flink-cep
1个回答
1
投票

如果您首先通过sensorArea键入流,您将会有更轻松的时间。它们将在流上进行模式匹配,其中所有事件都针对单个sensorArea,这将使模式更容易表达,匹配更有效。

您无法避免使用迭代条件和ctx,但在键入流后它应该更便宜。

此外,您的代码示例与文本说明不匹配。文本说“在5秒内”和“3次或更多次”,而代码有within(Time.seconds(2))timesOrMore(2)

© www.soinside.com 2019 - 2024. All rights reserved.