我有一个 Flink 作业,我不明白为什么它不会打印到标准输出。我注意到,如果删除过滤器和水印,我会看到来自我的 kafka 主题的原始消息。但是应用聚合和水印,我什么也没得到。这是我在 main() 下的代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// // create kafka source
KafkaSource<VideoAdCompletedEvent> videoAdsSource = KafkaSource
.builder()
.setBootstrapServers(jobConfig.kafkaSourceServer)
.setTopics(jobConfig.sourceKafkaTopic).setGroupId(jobConfig.consumerGroupId)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new VideoDeserSchema())
.build();
WatermarkStrategy<VideoAdCompletedEvent> watermarkStrategy = WatermarkStrategy
.<VideoAdCompletedEvent>forBoundedOutOfOrderness(Duration.ofSeconds(30)) // allow events up to 30 seconds out of order
.withTimestampAssigner(
(event, timestamp) -> event.getOriginTs()
)
.withIdleness(Duration.ofMillis(500));
DataStream<VideoAdCompletedEvent> videoAdsStream = env
.fromSource(videoAdsSource, watermarkStrategy, "Mobile Analytics Filtered Video Ads Topic")
.assignTimestampsAndWatermarks(watermarkStrategy);
DataStream<VideoAdCompletedEvent> filteredStream = videoAdsStream
.filter(new FilterFunction<VideoAdCompletedEvent>() {
@Override
public boolean filter(VideoAdCompletedEvent message) throws Exception {
return (message.getCompletedReason().equals("VIDEO_FINISHED") || message.getCompletedReason().equals("video_finished"));
}
});
DataStream<Tuple2<String, Integer>> aggStream = filteredStream
.keyBy((VideoAdCompletedEvent event) -> event.getVideoId())
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.aggregate(new DistinctUserIdAggregate(), new VideoIdCountProcessFunction());
SinkFunction<VideoAdCompletedEvent> printSink = new PrintSinkFunction();
aggStream.addSink(printSink);
env.execute("Test Job");
有什么想法吗?
一些可能的解释...
FilterFunction
正在删除所有事件。VideoIdCountProcessFunction
未产生任何结果。添加额外的日志记录和/或计数器将有助于找出事件被删除的位置。
请注意,应删除对
.assignTimestampsAndWatermarks()
的调用,因为您已经通过 .fromSource()
调用将水印策略传递到 Kafka 源。