我有kafka输入和输出主题。我要做的是我在主题A中编写内容,并希望在主题B中接收消息但具有缓冲功能。我的代码如下:
@Bean
public Function<Flux<String>, Flux<String>> stringFlow() {
return flux ->
flux.window(4)
.flatMap(m -> m);
}
输入主题:
A
A
B
输出主题:
A
% Reached end of topic test-topic [0] at offset 78
A
% Reached end of topic test-topic [0] at offset 79
B
% Reached end of topic test-topic [0] at offset 80
我期望Flux是带有消息的kafka流,而不是每个kafka消息都是Flux。有没有办法缓冲kafka消息,以便我可以存储,即10条消息,删除重复项,然后发布它们?
buffer()
,而不是window()
是您只需要将结果作为集合而不是Flux
返回的运算符。
如果您还想从结果集合中过滤重复项,则可以通过传递HashSet供应商作为第二个参数来使用Set
而不是List
来存储值:
flux -> flux.buffer(10, HashSet::new)