KStream抑制运算符如何确定窗口的最后一条记录?

问题描述 投票:0回答:1

这是带有抑制运算符的窗口的简单定义:

stream
  .groupBy()
  .windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
  .aggregate(...) // implementation of aggregate function
  .suppress(untilWindowCloses(unbounded())
  .toStream()
  // process last event here
  ... 

所以我的问题是,抑制运算符如何检测一个事件是否是窗口的最后一个事件?想象一下,我删除了抑制运算符:

stream
  .groupBy()
  .windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
  .aggregate(...) // implementation of aggregate function
  .toStream()
  ... 

我知道KTable的每次更改都会产生两个事件:

  1. 具有null值的记录以删除先前的记录
  2. 具有新值的新记录

我想做的是删除suppress运算符并自己检测最后一条记录:

stream
  .groupBy()
  .windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
  .aggregate(...) // implementation of aggregate function
  .toStream()
  .filter( /* detect the last record here */ )

此信息是否以DSL或处理器API公开?

apache-kafka apache-kafka-streams
1个回答
0
投票

信息仅间接公开。 suppress()运算符使用状态存储来跟踪先前接收到的消息。这样可以将旧消息/新消息彼此进行比较,并确定何时实际发出某些消息。

注意,无状态filter()无法实现。如果您想了解详细信息,则需要阅读源代码。

但是主要问题是:为什么要删除suppress()开头?

© www.soinside.com 2019 - 2024. All rights reserved.