我刚刚第一次定义了一个KStreams拓扑,并希望快速进行健全性检查,以确保我没有做一些愚蠢的事情(或者如果有更好的方法。)基本上我归结为:
[input-topic] --> alltime-store
--> thisweek-store --> (logic) --> {new-record} -->[input-topic]
从输入主题读取记录,然后将groupBy()
和Materialized
读取到两个单独的计数存储中。
thisweek-store
是windowedBy
一个特定的持续时间,并传递给一个应用一些逻辑的filter
,一个例子可能是:
if value > 10
then send new-record
如果不是很明显,在非常高的水平上,如果有人在一周内做了足够的事情,我会尝试奖励。
在拓扑中创建这样的循环是否可以?
我能看到的唯一明显的问题是你可能会创建一个无限循环;但希望在[逻辑]块中可以防止这种情况。
在同一个应用程序中使用Streams.to()
输出主题作为builder.stream()
输入主题是完全没问题的。
因为可以有有效的场景,即强化学习。如果您正在训练某些数据,您可以根据之前的数据生成较新的事件。