Kafka流如何测试滑动窗口?

问题描述 投票:0回答:1

我有一个测试像这样的滑动窗口的测试:

.groupByKey
.windowedBy {
        TimeWindows.of(Duration.ofMinutes(10))
          .advanceBy(Duration.ofMinutes(1)).grace(Duration.ofMillis(0))
      }
      .aggregate(...)
      .suppress(Suppressed.untilWindowCloses(unbounded()))

我需要测试滑动窗口在一步之后产生正确的输出。我做了这样的事情:

for (i <- 0 to 8) testDriver.pipeInput(record..., T0 to T8)// produce one record every minute (9 records)

for (i <- 0 to 8)testDriver.pipeInput(record..., T8 + 5min) // these comes with a timestamp > 10 minutes tso there are on a second window

第一个循环应在第一个时间窗口内产生结果,并且在滑动1分钟后,第二个循环应产生第二个记录,并带有第二个窗口的结果(滑动后)我该如何检查?我不完全了解如何使用输出读取器来检查这两个结果。

scala apache-kafka streaming apache-kafka-streams
1个回答
1
投票

[只要您希望输出可用,就会调用testDriver.readOutput()。当然,您需要将结果写入输出主题,例如,

...
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.to("output-topic");
© www.soinside.com 2019 - 2024. All rights reserved.