我有一个测试像这样的滑动窗口的测试:
.groupByKey
.windowedBy {
TimeWindows.of(Duration.ofMinutes(10))
.advanceBy(Duration.ofMinutes(1)).grace(Duration.ofMillis(0))
}
.aggregate(...)
.suppress(Suppressed.untilWindowCloses(unbounded()))
我需要测试滑动窗口在一步之后产生正确的输出。我做了这样的事情:
for (i <- 0 to 8) testDriver.pipeInput(record..., T0 to T8)// produce one record every minute (9 records)
for (i <- 0 to 8)testDriver.pipeInput(record..., T8 + 5min) // these comes with a timestamp > 10 minutes tso there are on a second window
第一个循环应在第一个时间窗口内产生结果,并且在滑动1分钟后,第二个循环应产生第二个记录,并带有第二个窗口的结果(滑动后)我该如何检查?我不完全了解如何使用输出读取器来检查这两个结果。
[只要您希望输出可用,就会调用testDriver.readOutput()
。当然,您需要将结果写入输出主题,例如,
...
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.to("output-topic");