这是带有抑制运算符的窗口的简单定义:
stream
.groupBy()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
.aggregate(...) // implementation of aggregate function
.suppress(untilWindowCloses(unbounded())
.toStream()
// process last event here
...
所以我的问题是,抑制运算符如何检测一个事件是否是窗口的最后一个事件?想象一下,我删除了抑制运算符:
stream
.groupBy()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
.aggregate(...) // implementation of aggregate function
.toStream()
...
我知道KTable
的每次更改都会产生两个事件:
null
值的记录以删除先前的记录我想做的是删除suppress
运算符并自己检测最后一条记录:
stream
.groupBy()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
.aggregate(...) // implementation of aggregate function
.toStream()
.filter( /* detect the last record here */ )
此信息是否以DSL或处理器API公开?
信息仅间接公开。 suppress()
运算符使用状态存储来跟踪先前接收到的消息。这样可以将旧消息/新消息彼此进行比较,并确定何时实际发出某些消息。
注意,无状态filter()
无法实现。如果您想了解详细信息,则需要阅读源代码。
但是主要问题是:为什么要删除suppress()
开头?