我的 flink 模式检测程序只返回第一个匹配,而不是所有匹配的模式

问题描述 投票:0回答:0

我有一个简单的 Flink 应用程序,试图检测从以下文本文件创建的事件流上的模式:

1,A
2,B
3,C
4,A
5,C
6,B
7,D
8,D
9,A
10,D

我这样定义模式:

        Pattern<DataEvent, ?> pattern = Pattern.<DataEvent>begin("start")
                .where(new SimpleCondition<DataEvent>() {
                    @Override
                    public boolean filter(DataEvent dataEvent) throws Exception {
                        return dataEvent.getType().equals("A");
                    }
                }).next("middle")
                .where(new SimpleCondition<DataEvent>() {
                    @Override
                    public boolean filter(DataEvent dataEvent) throws Exception {
                        return dataEvent.getType().equals("B");
                    }

                }).followedBy("end")
                .where(new SimpleCondition<DataEvent>() {
                    @Override
                    public boolean filter(DataEvent dataEvent) throws Exception {
                        return dataEvent.getType().equals("A");
                    }

                });

并以这种方式使用 patternStream.process 执行模式检测:

DataStream<DataEvent> result = patternStream.process(new PatternProcessFunction<DataEvent, DataEvent>() {

            @Override
            public void processMatch(Map<String, List<DataEvent>> map, Context context, Collector<DataEvent> collector) throws Exception {


                DataEvent startEvent = map.get("start").get(0);
                DataEvent middleEvent = map.get("middle").get(0);
                DataEvent endEvent = map.get("end").get(0);
                collector.collect(new DataEvent( endEvent.getTimestamp(),
                        startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"));
            }
        });

并以这种方式使用 patternStream.flatSelect:

DataStream<DataEvent> result = patternStream.flatSelect(
                new PatternFlatSelectFunction<DataEvent, DataEvent>() {
                    @Override
                    public void flatSelect(Map<String, List<DataEvent>> map, Collector<DataEvent> collector) throws Exception {
                        DataEvent startEvent = map.get("start").get(0);
                        DataEvent middleEvent = map.get("middle").get(0);
                        DataEvent endEvent = map.get("end").get(0);
                        collector.collect(new DataEvent(
                                endEvent.getTimestamp(),
                                startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"
                        ));
                    }
                }
        );

但是“结果”事件流只包含第一个匹配的模式,而不是所有的。在这两种情况下,输出文件只包含这一行:

4:A-B-A(1-2-4)

我在模式定义的末尾使用了oneOrMore(),但结果是:

4:A-B-A(1-2-4)
4:A-B-A(1-2-4)

我希望过程或选择功能选择(A-B-后跟-A)的所有可能组合是:

4:A-B-A(1-2-4)
4:A-B-A(1-2-9)

另外,如果我在输入文件中的第6行之前再添加一行,并这样添加“6,A”:

1,A
2,B
3,C
4,A
5,C
6,A
7,B
8,D
9,D
10,A
11,D

结果是:

10:A-B-A(6-7-10)
4:A-B-A(1-2-4)

这意味着它将在找到第一个匹配项后从头开始模式匹配。 我该如何解决这个问题?

我的完整代码是这样的:

package org.example;


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;


import org.apache.flink.streaming.api.windowing.time.Time;

import java.io.File;
import java.io.FileInputStream;
import java.text.SimpleDateFormat;
import java.util.*;

import org.apache.flink.cep.CEP;

import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;

import org.apache.flink.streaming.api.windowing.time.Time;

import java.io.FileOutputStream;
import java.io.IOException;



import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;


public class EventStreamCEP {

    public static List<DataEvent> originalStream = new ArrayList<>();
    public static List<DataEvent> complexEvents = new ArrayList<>();


    public static void main(String[] args) throws Exception {


        


        // Set up the Flink execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// Define the input data format
        TextInputFormat inputFormat = new TextInputFormat(new Path("/home/majidlotfian/flink/flink-quickstart/PLprivacy/input_folder/input.txt"));

// read the input data from a file
        DataStream<DataEvent> eventStream = env.readFile(inputFormat, "/home/majidlotfian/flink/flink-quickstart/PLprivacy/input_folder/input.txt")
                .map(new MapFunction<String, DataEvent>() {
                    @Override
                    public DataEvent map(String value) throws Exception {
                        // Parse the line into an event object
                        String[] fields = value.split(",");
                        long timestamp = Integer.parseInt(fields[0]);
                        String type = fields[1];
                        DataEvent event = new DataEvent(timestamp,type);
                        //event.setTimestamp(timestamp);
                        return event;
                    }
                })

                // Assign timestamps and watermarks
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<DataEvent>() {
                    private long currentMaxTimestamp;
                    private final long maxOutOfOrderness = 10000; // 10 seconds

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
                    }

                    @Override
                    public long extractTimestamp(DataEvent element, long previousElementTimestamp) {
                        long timestamp = element.getTimestamp();
                        currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
                        return timestamp;
                    }
                });



        // Define a pattern to detect events in the stream
        Pattern<DataEvent, ?> pattern = Pattern.<DataEvent>begin("start")
                .where(new SimpleCondition<DataEvent>() {
                    @Override
                    public boolean filter(DataEvent dataEvent) throws Exception {
                        return dataEvent.getType().equals("A");
                    }
                }).next("middle")
                .where(new SimpleCondition<DataEvent>() {
                    @Override
                    public boolean filter(DataEvent dataEvent) throws Exception {
                        return dataEvent.getType().equals("B");
                    }

                }).followedBy("end")
                .where(new SimpleCondition<DataEvent>() {
                    @Override
                    public boolean filter(DataEvent dataEvent) throws Exception {
                        return dataEvent.getType().equals("A");
                    }

                });
        //pattern.oneOrMore();





        // Create a pattern stream using the defined pattern
        PatternStream<DataEvent> patternStream = CEP.pattern(eventStream, pattern);


        /*
        DataStream<DataEvent> result = patternStream.process(new PatternProcessFunction<DataEvent, DataEvent>() {

            @Override
            public void processMatch(Map<String, List<DataEvent>> map, Context context, Collector<DataEvent> collector) throws Exception {


                DataEvent startEvent = map.get("start").get(0);
                DataEvent middleEvent = map.get("middle").get(0);
                DataEvent endEvent = map.get("end").get(0);
                collector.collect(new DataEvent( endEvent.getTimestamp(),
                        startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"));
            }
        });

         */


        // Use PatternFlatSelectFunction to get all matched patterns
        DataStream<DataEvent> result = patternStream.flatSelect(
                new PatternFlatSelectFunction<DataEvent, DataEvent>() {
                    @Override
                    public void flatSelect(Map<String, List<DataEvent>> map, Collector<DataEvent> collector) throws Exception {
                        DataEvent startEvent = map.get("start").get(0);
                        DataEvent middleEvent = map.get("middle").get(0);
                        DataEvent endEvent = map.get("end").get(0);
                        collector.collect(new DataEvent(
                                endEvent.getTimestamp(),
                                startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"
                        ));
                    }
                }
        );





// print the windowed event stream

        result.print();

        // write the matched patterns to a text file
        String outputPath = "/home/majidlotfian/flink/flink-quickstart/PLprivacy/output_folder/output.txt";
        result.map(new MapFunction<DataEvent, String>() {
                    @Override
                    public String map(DataEvent value) throws Exception {
                        return value.getTimestamp()+":"+value.getType();
                    }
                })
                .writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE)
                .setParallelism(1);  // ensure that events are written in order

        

        env.execute("EventStreamCEP");


}

pattern-matching apache-flink flink-cep
© www.soinside.com 2019 - 2024. All rights reserved.