Flink 作业不向 StdOut 打印任何内容

问题描述 投票:0回答:1

我有一个 Flink 作业,我不明白为什么它不会打印到标准输出。我注意到,如果删除过滤器和水印,我会看到来自我的 kafka 主题的原始消息。但是应用聚合和水印,我什么也没得到。这是我在 main() 下的代码

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // // create kafka source

    KafkaSource<VideoAdCompletedEvent> videoAdsSource = KafkaSource
    .builder()
    .setBootstrapServers(jobConfig.kafkaSourceServer)
    .setTopics(jobConfig.sourceKafkaTopic).setGroupId(jobConfig.consumerGroupId)
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new VideoDeserSchema())
    .build();

    WatermarkStrategy<VideoAdCompletedEvent> watermarkStrategy = WatermarkStrategy
    .<VideoAdCompletedEvent>forBoundedOutOfOrderness(Duration.ofSeconds(30)) // allow events up to 30 seconds out of order
    .withTimestampAssigner(
        (event, timestamp) -> event.getOriginTs()
    )
    .withIdleness(Duration.ofMillis(500));

    DataStream<VideoAdCompletedEvent> videoAdsStream = env
    .fromSource(videoAdsSource, watermarkStrategy, "Mobile Analytics Filtered Video Ads Topic")
    .assignTimestampsAndWatermarks(watermarkStrategy);

    DataStream<VideoAdCompletedEvent> filteredStream = videoAdsStream
    .filter(new FilterFunction<VideoAdCompletedEvent>() {
        @Override
        public boolean filter(VideoAdCompletedEvent message) throws Exception {
          return (message.getCompletedReason().equals("VIDEO_FINISHED") || message.getCompletedReason().equals("video_finished"));
        }
    });

    DataStream<Tuple2<String, Integer>>  aggStream = filteredStream
        .keyBy((VideoAdCompletedEvent event) -> event.getVideoId())
        .window(TumblingEventTimeWindows.of(Time.seconds(30)))
        .aggregate(new DistinctUserIdAggregate(), new VideoIdCountProcessFunction());

    SinkFunction<VideoAdCompletedEvent> printSink = new PrintSinkFunction();

    aggStream.addSink(printSink);

    env.execute("Test Job");

有什么想法吗?

apache-flink flink-streaming watermark
1个回答
0
投票

一些可能的解释...

  1. 您的
    FilterFunction
    正在删除所有事件。
  2. 您的
    VideoIdCountProcessFunction
    未产生任何结果。
  3. 所有事件在相同的 60 秒时间间隔内都有水印,因此您永远不会生成导致窗口触发的水印。

添加额外的日志记录和/或计数器将有助于找出事件被删除的位置。

请注意,应删除对

.assignTimestampsAndWatermarks()
的调用,因为您已经通过
.fromSource()
调用将水印策略传递到 Kafka 源。

© www.soinside.com 2019 - 2024. All rights reserved.