会话窗口链接

问题描述 投票:0回答:2

有人可以帮助我了解flink中的窗口(会话)何时以及如何发生?或如何处理样品?

例如,如果有连续的事件流流入,则事件是应用程序中的请求和应用程序提供的响应。作为flink处理的一部分,我们需要了解处理请求所花费的时间。

我知道有配置的时间间隔窗口,每隔n秒就会触发一次,时间一到,就会聚合该时间窗口中的所有事件。

例如:假设定义的时间窗口为30秒,并且如果一个事件在t时间到达,另一个事件在t + 30到达,则两个事件都将被处理,但是到达t + 31的事件将被忽略。

如果我说的不对,请纠正。

上面的问题是:如果说一个事件在t时间到达,而另一个事件在t + 3时间到达,它是否还要等待整整30秒来汇总并最终确定结果?

现在在会话窗口的情况下,这是如何工作的?如果事件是分别处理的,并且在反序列化时将代理时间戳记用作单个事件的session_id,那么将为每个事件创建会话窗口吗?如果是,那么我们是否需要区别对待请求和响应事件,因为如果不这样做,那么响应事件就不会获得其自己的会话窗口吗?

我将尝试在短时间内发布我正在玩的示例(以Java语言编写,但上述几点的任何输入都将有所帮助!

处理功能

DTO的:

public class IncomingEvent{
    private String id;
    private Date timestamp;
    private String component;
    //getters and setters
}
public class FinalOutPutEvent{
    private String id;
    private long timeTaken;
    //getters and setters
}

===============================================传入事件的反序列​​化:

公共类IncomingEventDeserializationScheme实现KafkaDeserializationSchema {

private ObjectMapper mapper;

public IncomingEventDeserializationScheme(ObjectMapper mapper) {
    this.mapper = mapper;
}

@Override
public TypeInformation<IncomingEvent> getProducedType() {
    return TypeInformation.of(IncomingEvent.class);
}

@Override
public boolean isEndOfStream(IncomingEvent nextElement) {
    return false;
}

@Override
public IncomingEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
    if (record.value() == null) {
        return null;
    }
    try {
        IncomingEvent event = mapper.readValue(record.value(), IncomingEvent.class);
        if(event != null) {
            new SessionWindow(record.timestamp());
            event.setOffset(record.offset());
            event.setTopic(record.topic());
            event.setPartition(record.partition());
            event.setBrokerTimestamp(record.timestamp());
        }
        return event;
    } catch (Exception e) {
        return null;
    }
}

}

===============================================] >

主要逻辑
     public class MyEventJob {

private static final ObjectMapper mapper = new ObjectMapper();

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    MyEventJob eventJob = new MyEventJob();

    InputStream inStream = eventJob.getFileFromResources("myConfig.properties");
    ParameterTool parameter = ParameterTool.fromPropertiesFile(inStream);
    Properties properties = parameter.getProperties();
    Integer timePeriodBetweenEvents = 120;
    String outWardTopicHostedOnServer = localhost:9092";
    DataStreamSource<IncomingEvent> stream = env.addSource(new FlinkKafkaConsumer<>("my-input-topic", new IncomingEventDeserializationScheme(mapper), properties));
    SingleOutputStreamOperator<IncomingEvent> filteredStream = stream
        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<IncomingEvent>() {
            long eventTime;
            @Override
            public long extractTimestamp(IncomingEvent element, long previousElementTimestamp) {
                return element.getTimestamp();
            }
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(eventTime); 
            }
        })
        .map(e -> { e.setId(e.getComponent() + "_" + e.getTransactionId()); return e; });
    SingleOutputStreamOperator<FinalOutPutEvent> correlatedStream = filteredStream
        .keyBy(new KeySelector<IncomingEvent, String> (){
            @Override
            public String getKey(@Nonnull IncomingEvent input) throws Exception {
                return input.getId();
            }
        })
        .window(GlobalWindows.create()).allowedLateness(Time.seconds(defaultSliceTimePeriod))
        .trigger( new Trigger<IncomingEvent, Window> (){
            private final long sessionTimeOut;
            public SessionTrigger(long sessionTimeOut) {
                this.sessionTimeOut = sessionTimeOut;
            }
            @Override
            public TriggerResult onElement(IncomingEvent element, long timestamp, Window window, TriggerContext ctx)
                    throws Exception {
                ctx.registerProcessingTimeTimer(timestamp + sessionTimeOut); 
                return TriggerResult.CONTINUE;
            }
            @Override
            public TriggerResult onProcessingTime(long time, Window window, TriggerContext ctx) throws Exception {
                return TriggerResult.FIRE_AND_PURGE;
            }
            @Override
            public TriggerResult onEventTime(long time, Window window, TriggerContext ctx) throws Exception {
                    return TriggerResult.CONTINUE;
            }
            @Override
            public void clear(Window window, TriggerContext ctx) throws Exception {
                //check the clear method implementation
            }
        })
        .process(new ProcessWindowFunction<IncomingEvent, FinalOutPutEvent, String, SessionWindow>() {
        @Override
        public void process(String arg0,
                ProcessWindowFunction<IncomingEvent, FinalOutPutEvent, String, SessionWindow>.Context arg1,
                Iterable<IncomingEvent> input, Collector<FinalOutPutEvent> out) throws Exception {
            List<IncomingEvent> eventsIn = new ArrayList<>();
            input.forEach(eventsIn::add);
            if(eventsIn.size() == 1) {
                //Logic to handle the partial request/response
            } else if (eventsIn.size() == 2) {
                //Logic to handle the complete request/response and how much time it took
            }
        }
    } );
        FlinkKafkaProducer<FinalOutPutEvent> kafkaProducer = new FlinkKafkaProducer<>(
                outWardTopicHostedOnServer,            // broker list
                "target-topic",            // target topic
                new EventSerializationScheme(mapper));
    correlatedStream.addSink(kafkaProducer);
    env.execute("Streaming");
}

}

谢谢Vicky

有人可以帮助我了解flink中的窗口(会话)何时以及如何发生?或如何处理样品?例如,如果有连续的事件流入,则事件...

apache-flink flink-streaming
2个回答
1
投票

根据您的描述,我认为您想编写一个由ProcessFunction键控的自定义session_id。您将有一个ValueState,用于存储请求事件的时间戳。当您获得相应的响应事件时,您将计算增量并发出增量(使用session_id)并清除状态。


0
投票

因此,使用默认触发器,每个窗口在其时间完全过去后都会完成。取决于您使用的是EventTime还是ProcessingTime,这可能意味着不同的意思,但是通常,Flink将始终等待窗口关闭,然后再对其进行完全处理。就您而言,t + 31处的事件将直接转到另一个窗口。

© www.soinside.com 2019 - 2024. All rights reserved.