我有一个简单的 Flink 应用程序,试图检测从以下文本文件创建的事件流上的模式:
1,A
2,B
3,C
4,A
5,C
6,B
7,D
8,D
9,A
10,D
我这样定义模式:
Pattern<DataEvent, ?> pattern = Pattern.<DataEvent>begin("start")
.where(new SimpleCondition<DataEvent>() {
@Override
public boolean filter(DataEvent dataEvent) throws Exception {
return dataEvent.getType().equals("A");
}
}).next("middle")
.where(new SimpleCondition<DataEvent>() {
@Override
public boolean filter(DataEvent dataEvent) throws Exception {
return dataEvent.getType().equals("B");
}
}).followedBy("end")
.where(new SimpleCondition<DataEvent>() {
@Override
public boolean filter(DataEvent dataEvent) throws Exception {
return dataEvent.getType().equals("A");
}
});
并以这种方式使用 patternStream.process 执行模式检测:
DataStream<DataEvent> result = patternStream.process(new PatternProcessFunction<DataEvent, DataEvent>() {
@Override
public void processMatch(Map<String, List<DataEvent>> map, Context context, Collector<DataEvent> collector) throws Exception {
DataEvent startEvent = map.get("start").get(0);
DataEvent middleEvent = map.get("middle").get(0);
DataEvent endEvent = map.get("end").get(0);
collector.collect(new DataEvent( endEvent.getTimestamp(),
startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"));
}
});
并以这种方式使用 patternStream.flatSelect:
DataStream<DataEvent> result = patternStream.flatSelect(
new PatternFlatSelectFunction<DataEvent, DataEvent>() {
@Override
public void flatSelect(Map<String, List<DataEvent>> map, Collector<DataEvent> collector) throws Exception {
DataEvent startEvent = map.get("start").get(0);
DataEvent middleEvent = map.get("middle").get(0);
DataEvent endEvent = map.get("end").get(0);
collector.collect(new DataEvent(
endEvent.getTimestamp(),
startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"
));
}
}
);
但是“结果”事件流只包含第一个匹配的模式,而不是所有的。在这两种情况下,输出文件只包含这一行:
4:A-B-A(1-2-4)
我在模式定义的末尾使用了oneOrMore(),但结果是:
4:A-B-A(1-2-4)
4:A-B-A(1-2-4)
我希望过程或选择功能选择(A-B-后跟-A)的所有可能组合是:
4:A-B-A(1-2-4)
4:A-B-A(1-2-9)
另外,如果我在输入文件中的第6行之前再添加一行,并这样添加“6,A”:
1,A
2,B
3,C
4,A
5,C
6,A
7,B
8,D
9,D
10,A
11,D
结果是:
10:A-B-A(6-7-10)
4:A-B-A(1-2-4)
这意味着它将在找到第一个匹配项后从头开始模式匹配。 我该如何解决这个问题?
我的完整代码是这样的:
package org.example;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.io.File;
import java.io.FileInputStream;
import java.text.SimpleDateFormat;
import java.util.*;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.io.FileOutputStream;
import java.io.IOException;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class EventStreamCEP {
public static List<DataEvent> originalStream = new ArrayList<>();
public static List<DataEvent> complexEvents = new ArrayList<>();
public static void main(String[] args) throws Exception {
// Set up the Flink execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Define the input data format
TextInputFormat inputFormat = new TextInputFormat(new Path("/home/majidlotfian/flink/flink-quickstart/PLprivacy/input_folder/input.txt"));
// read the input data from a file
DataStream<DataEvent> eventStream = env.readFile(inputFormat, "/home/majidlotfian/flink/flink-quickstart/PLprivacy/input_folder/input.txt")
.map(new MapFunction<String, DataEvent>() {
@Override
public DataEvent map(String value) throws Exception {
// Parse the line into an event object
String[] fields = value.split(",");
long timestamp = Integer.parseInt(fields[0]);
String type = fields[1];
DataEvent event = new DataEvent(timestamp,type);
//event.setTimestamp(timestamp);
return event;
}
})
// Assign timestamps and watermarks
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<DataEvent>() {
private long currentMaxTimestamp;
private final long maxOutOfOrderness = 10000; // 10 seconds
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(DataEvent element, long previousElementTimestamp) {
long timestamp = element.getTimestamp();
currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
return timestamp;
}
});
// Define a pattern to detect events in the stream
Pattern<DataEvent, ?> pattern = Pattern.<DataEvent>begin("start")
.where(new SimpleCondition<DataEvent>() {
@Override
public boolean filter(DataEvent dataEvent) throws Exception {
return dataEvent.getType().equals("A");
}
}).next("middle")
.where(new SimpleCondition<DataEvent>() {
@Override
public boolean filter(DataEvent dataEvent) throws Exception {
return dataEvent.getType().equals("B");
}
}).followedBy("end")
.where(new SimpleCondition<DataEvent>() {
@Override
public boolean filter(DataEvent dataEvent) throws Exception {
return dataEvent.getType().equals("A");
}
});
//pattern.oneOrMore();
// Create a pattern stream using the defined pattern
PatternStream<DataEvent> patternStream = CEP.pattern(eventStream, pattern);
/*
DataStream<DataEvent> result = patternStream.process(new PatternProcessFunction<DataEvent, DataEvent>() {
@Override
public void processMatch(Map<String, List<DataEvent>> map, Context context, Collector<DataEvent> collector) throws Exception {
DataEvent startEvent = map.get("start").get(0);
DataEvent middleEvent = map.get("middle").get(0);
DataEvent endEvent = map.get("end").get(0);
collector.collect(new DataEvent( endEvent.getTimestamp(),
startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"));
}
});
*/
// Use PatternFlatSelectFunction to get all matched patterns
DataStream<DataEvent> result = patternStream.flatSelect(
new PatternFlatSelectFunction<DataEvent, DataEvent>() {
@Override
public void flatSelect(Map<String, List<DataEvent>> map, Collector<DataEvent> collector) throws Exception {
DataEvent startEvent = map.get("start").get(0);
DataEvent middleEvent = map.get("middle").get(0);
DataEvent endEvent = map.get("end").get(0);
collector.collect(new DataEvent(
endEvent.getTimestamp(),
startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"
));
}
}
);
// print the windowed event stream
result.print();
// write the matched patterns to a text file
String outputPath = "/home/majidlotfian/flink/flink-quickstart/PLprivacy/output_folder/output.txt";
result.map(new MapFunction<DataEvent, String>() {
@Override
public String map(DataEvent value) throws Exception {
return value.getTimestamp()+":"+value.getType();
}
})
.writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE)
.setParallelism(1); // ensure that events are written in order
env.execute("EventStreamCEP");
}
• flink向hive写入数据insert overwrite时出错
• 如何将 Flink S3 sink Delivery mode 从 EXACTLY_ONCE 更新为 AT_LEAST_ONCE
• EventTimeWindows 的 Flink 最低要求
• 在并行 Flink DataStream 中应用多个过滤器
• Flink IOException: Insufficient number of network buffers
• 如何从 FlinkSessionJob 访问节点的文件?
• 在 apache flink 中,我们应该更新每个收集还是每个输入的状态?
• flink-connector-gcp-pubsub 的正确使用方法是什么
• 如何使用 big keyed 在 Flink Apache 上工作?
• Flink TableAPI:Parquet 文件中缺少 PartitionedBy 列
• StreamingFileSink forBulkFormat vs forRowFormat with avro encoder