Webflux:kafka 接收器中 .map() 和 .flatMap() 行为的区别

问题描述 投票:0回答:1

我已经阅读了

project reactor
map()
方法的
flatMap()
文档,并且在this答案中也有很好的解释。

但是我的问题是关于我们何时使用

reactor
KafkaReceiver
。以下代码示例我有:

    //start of consumption    
    public Disposable consumeMessage() {
        return processKafkaRecord().subscribe(record -> log.info("success"),
                error -> log.error("error logged" + error));
    }

    public Flux<String> processKafkaRecord() {
         Flux<ReceiverRecord<String, String>> receiverRecord = Flux.defer(() -> inputEventReceiver.receive());
         return receiverRecord.doOnNext(record -> log.info("Input Event receiver record {}", record.toString()))
            .flatMap(this::processMessage)
            .doOnComplete(() -> log.info("MSG=Completed consuming messages from topic={}" +"for Cancel Validation processing.", inputEventKafkaConfig.getTopic()));
    }

    private Flux<String> processMessage(final ReceiverRecord<String, String> receiverRecord) {
         //logic
        .flatMap(this::processOne);
        .flatMap(this::processTwo);
        .flatMap(this::processThree);
    }

简而言之,我的疑问是,如果我在

.map()
中使用
processMessage
方法而不是
.flatMap()
函数,它对
KafkaReceiver
的性能会有什么不同吗?

疑问解释:当在

KafkaReceiver
中使用数据流时,我们已经使用
flux
进行消费,而在
processKafkaRecord
方法中,调用通过
flatMap()
方法进行,因此每个单独的记录都应与此异步处理只有。

一旦我们到达

processMessage()
方法,它实际上是在处理单个记录。现在,如果我的
processOne
processTwo
processThree
方法必须针对每个单独的事件以同步方式发生,那么使用
flatMap()
而不是
map()
是否有意义。

一旦在 processKafkaRecord() 方法中调用了 flatMap() 方法,内部方法就已经对每个事件处于异步状态。如果 processMessage 中的每个进程都必须以同步方式发生,那么使用 map 会更有意义吗? 或者我这个结论是错误的,我们甚至应该在内部方法中使用 flatMap 来提高性能?

spring-boot spring-webflux reactive-programming project-reactor
1个回答
0
投票

这真的取决于你的处理逻辑是什么,但看起来你正在混合并发、并行和异步/非阻塞执行。异步执行并不意味着一劳永逸。它更多的是关于无阻塞执行。逻辑仍然可以是顺序的。

map
flatMap

  • 使用
    flatMap
    执行异步/反应逻辑,例如 http 请求、数据库读/写、其他 I/O 绑定操作并返回
    Mono
    Flux
    .
  • 使用
    map
    执行对象映射等同步逻辑。

并发性

默认情况下,

flatMap
将同时处理
Queues.SMALL_BUFFER_SIZE = 256
个飞行中的内部序列。

您可以控制并发性

flatMap(item -> process(item), concurrency)
或使用
concatMap
运算符如果你想顺序处理。检查 flatMap(..., int concurrency, int prefetch) 了解详情。

flatMap
有不同的“味道”。如果您需要顺序处理 - 使用
concatMap
基本上是
flatMap
并发 = 1.

Kafka 有序处理与无序处理

根据用例,有多种选择。

有序消息处理 如果消息顺序很重要,消息的处理顺序应与生产者发送的顺序相同。 Kafka 保证每个分区的消息顺序。

在 Reactior Kafka 中,您可以通过对每个分区的数据进行分组然后按顺序处理

kafkaReceiver.receive()
        .groupBy(message -> message.receiverOffset().topicPartition())
        .flatMap(partitions -> partitions.concatMap(this::process));

无序消息处理

如果顺序不重要并且消息可以按任何顺序处理,我们可以通过并行处理多条消息来提高吞吐量。

kafkaReceiver.receive()
        .flatMap(message -> process(message), concurrency);

无序消息处理将在少量分区上支持更高的吞吐量。对于有序消息处理,您需要增加分区数量以增加吞吐量。

© www.soinside.com 2019 - 2024. All rights reserved.