按事件时间对多个 Kafka 主题中的事件进行排序

问题描述 投票:0回答:2

我们正在尝试按我们从三个 Kafka 主题消耗的事件时间对事件进行排序。每个源主题都有三个分区,我们将 Flink 并行度也设置为 3。从 Kafka 读取事件后,我们将每个事件映射为通用格式并将它们联合起来。最后,我们通过共享的

eventID
将它们联合起来,并尝试使用 ProcessFunction 对它们进行排序,如here所述。

我们为事件分配事件时间,如下所示:

WatermarkStrategy<String> watermarkStrategy = 
    WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofMinutes(10))
    .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
        @Override
        public long extractTimestamp(String element, long recordTimestamp) {
            ...
        }
    });

KafkaSource<String> topic = KafkaSource.<String>builder()
    .setBootstrapServers("...")
    .setTopics("my_topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

DataStreamSource<String> sourceStream = env.fromSource(topic, watermarkStrategy, "Kafka Source");

(简化的)处理函数如下所示:


public class OrderProcessFunction extends KeyedProcessFunction<String, Tuple2<LocalDateTime, String>, Tuple2<LocalDateTime, String>> {

    private transient MapState<Long, List<Tuple2<LocalDateTime, String>>> queueState = null;

    @Override
    public void open(Configuration config) {
        TypeInformation<Long> key = TypeInformation.of(new TypeHint<Long>() {});
        TypeInformation<List<Tuple2<LocalDateTime, String>>> value = TypeInformation.of(new TypeHint<List<Tuple2<LocalDateTime, String>>>() {});
        queueState = getRuntimeContext().getMapState(new MapStateDescriptor<>("events-by-timestamp", key, value));
    }

    @Override
    public void processElement(Tuple2<LocalDateTime, String> event,
                               KeyedProcessFunction<String, Tuple2<LocalDateTime, String>, Tuple2<LocalDateTime, String>>.Context ctx,
                               Collector<Tuple2<LocalDateTime, String>> out) throws Exception {

        TimerService timerService = ctx.timerService();
        if (ctx.timestamp() > timerService.currentWatermark()) {

            List<Tuple2<LocalDateTime, String>> listEvents = queueState.get(ctx.timestamp());
            if (isEmpty(listEvents)) {
                listEvents = new ArrayList<>();
            }

            listEvents.add(event);
            queueState.put(ctx.timestamp(), listEvents);
            timerService.registerEventTimeTimer(ctx.timestamp());
        } else {
            // Event considered late, write to side output to debug
            ctx.output(sideOutputLateEventsProcessFunction, event);
        }
    }

    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<String, Tuple2<LocalDateTime, String>, Tuple2<LocalDateTime, String>>.OnTimerContext ctx,
                        Collector<Tuple2<LocalDateTime, String>> out) throws Exception {
        queueState.get(timestamp).forEach(out::collect);
        queueState.remove(timestamp);
    }
}

话题已经有近半年的历史事件了,我们也想订购。其中一个主题比其他两个主题要多得多,据我们了解,这将导致处理函数缓冲所有事件,直到水印从每个 Flink Kafka 源到达。

但是,当我们启动 Flink 作业时,只有少数事件被正确排序,但大多数事件(对于所有流)很快就会被认为在 ProcessFunction 中较晚,因此被丢弃。

我们有以下问题:

  • 我们从Flink Kafka源文档了解到水印是跨所有分区计算的。这意味着 Flink 首先需要为每个分区处理至少一个事件,然后生成该分区的水印。 Flink Kafka Source 将发出所有分区水印中最小的水印。那是对的吗?这是否也适用于
    SerializableTimestampAssigner
    ,因为目前看来不行。
  • 调试 Flink Kafka Source 的事件时间/水印机制以了解为什么处理函数会丢弃大部分事件的最佳方法是什么?我们目前正在使用侧输出来处理最新数据,并使用 Flink WebUI 来处理水印。

谢谢!

events apache-kafka apache-flink flink-streaming
2个回答
1
投票

在您的

processElement()
方法中,您需要根据传入事件计算“窗口开始时间”。这基于您需要的窗口大小,以便按时间戳正确排列事件。目前,您构建了一个由显式事件时间戳作为键控的列表,因此不会发生窗口化。

您还希望仅在向地图添加新条目时创建计时器,并将计时器的时间设置为窗口开始时间+窗口大小,以便在窗口预计“完成”时触发”.


0
投票

我们意识到我们在 Flink 中面临这个错误,该错误仅在后续版本中修复。

我们为所有 Flink Kafka 消费者配置了空闲状态,但该错误错误地将流消费者标记为空闲状态。这导致了流程功能中出现了很多迟到的事件。

© www.soinside.com 2019 - 2024. All rights reserved.