使用Flux缓冲kafka消息

问题描述 投票:0回答:1

我有kafka输入和输出主题。我要做的是我在主题A中编写内容,并希望在主题B中接收消息但具有缓冲功能。我的代码如下:

    @Bean
    public Function<Flux<String>, Flux<String>> stringFlow() {
        return flux ->
            flux.window(4)
                .flatMap(m -> m);
    }

输入主题:

A
A
B

输出主题:

A
% Reached end of topic test-topic [0] at offset 78
A
% Reached end of topic test-topic [0] at offset 79
B
% Reached end of topic test-topic [0] at offset 80

我期望Flux是带有消息的kafka流,而不是每个kafka消息都是Flux。有没有办法缓冲kafka消息,以便我可以存储,即10条消息,删除重复项,然后发布它们?

apache-kafka project-reactor spring-cloud-stream
1个回答
0
投票

buffer(),而不是window()是您只需要将结果作为集合而不是Flux返回的运算符。

如果您还想从结果集合中过滤重复项,则可以通过传递HashSet供应商作为第二个参数来使用Set而不是List来存储值:

flux -> flux.buffer(10, HashSet::new)

© www.soinside.com 2019 - 2024. All rights reserved.