我已经阅读了
project reactor
和map()
方法的flatMap()
文档,并且在this答案中也有很好的解释。
但是我的问题是关于我们何时使用
reactor
KafkaReceiver
。以下代码示例我有:
//start of consumption
public Disposable consumeMessage() {
return processKafkaRecord().subscribe(record -> log.info("success"),
error -> log.error("error logged" + error));
}
public Flux<String> processKafkaRecord() {
Flux<ReceiverRecord<String, String>> receiverRecord = Flux.defer(() -> inputEventReceiver.receive());
return receiverRecord.doOnNext(record -> log.info("Input Event receiver record {}", record.toString()))
.flatMap(this::processMessage)
.doOnComplete(() -> log.info("MSG=Completed consuming messages from topic={}" +"for Cancel Validation processing.", inputEventKafkaConfig.getTopic()));
}
private Flux<String> processMessage(final ReceiverRecord<String, String> receiverRecord) {
//logic
.flatMap(this::processOne);
.flatMap(this::processTwo);
.flatMap(this::processThree);
}
简而言之,我的疑问是,如果我在
.map()
中使用 processMessage
方法而不是 .flatMap()
函数,它对 KafkaReceiver
的性能会有什么不同吗?
疑问解释:当在
KafkaReceiver
中使用数据流时,我们已经使用 flux
进行消费,而在 processKafkaRecord
方法中,调用通过 flatMap()
方法进行,因此每个单独的记录都应与此异步处理只有。
一旦我们到达
processMessage()
方法,它实际上是在处理单个记录。现在,如果我的 processOne
、processTwo
和 processThree
方法必须针对每个单独的事件以同步方式发生,那么使用 flatMap()
而不是 map()
是否有意义。
一旦在 processKafkaRecord() 方法中调用了 flatMap() 方法,内部方法就已经对每个事件处于异步状态。如果 processMessage 中的每个进程都必须以同步方式发生,那么使用 map 会更有意义吗? 或者我这个结论是错误的,我们甚至应该在内部方法中使用 flatMap 来提高性能?
这真的取决于你的处理逻辑是什么,但看起来你正在混合并发、并行和异步/非阻塞执行。异步执行并不意味着一劳永逸。它更多的是关于无阻塞执行。逻辑仍然可以是顺序的。
map
对flatMap
flatMap
执行异步/反应逻辑,例如 http 请求、数据库读/写、其他 I/O 绑定操作并返回 Mono
或 Flux
.map
执行对象映射等同步逻辑。默认情况下,
flatMap
将同时处理 Queues.SMALL_BUFFER_SIZE = 256
个飞行中的内部序列。
您可以控制并发性
flatMap(item -> process(item), concurrency)
或使用concatMap
运算符如果你想顺序处理。检查 flatMap(..., int concurrency, int prefetch) 了解详情。
flatMap
有不同的“味道”。如果您需要顺序处理 - 使用 concatMap
基本上是 flatMap
并发 = 1.
根据用例,有多种选择。
有序消息处理 如果消息顺序很重要,消息的处理顺序应与生产者发送的顺序相同。 Kafka 保证每个分区的消息顺序。
在 Reactior Kafka 中,您可以通过对每个分区的数据进行分组然后按顺序处理
kafkaReceiver.receive()
.groupBy(message -> message.receiverOffset().topicPartition())
.flatMap(partitions -> partitions.concatMap(this::process));
无序消息处理
如果顺序不重要并且消息可以按任何顺序处理,我们可以通过并行处理多条消息来提高吞吐量。
kafkaReceiver.receive()
.flatMap(message -> process(message), concurrency);
无序消息处理将在少量分区上支持更高的吞吐量。对于有序消息处理,您需要增加分区数量以增加吞吐量。