Reactor Kafka 同步消费消息并异步处理它们

问题描述 投票:0回答:1

我对反应世界和使用 Spring Webflux + reactor Kafka 很陌生。

        kafkaReceiver
                .receive()
//                .publishOn(Schedulers.boundedElastic())
                .doOnNext(a -> log.info("Reading message: {}", a.value()))
                .concatMap(kafkaRecord ->
                 //perform DB operation
                 //kafkaRecord.receiverOffset.ackwnowledge         
                )
                .doOnError(e -> log.error("Error", e))
                .retry()
                .subscribe();

我明白,为了并行化消息消费,我必须为每个分区实例化一个 KafkaReceiver,但是是否有可能/建议分区以同步方式读取消息并异步处理它们(包括手动确认)?

所以这是所需的输出:

Reading message:1
Reading message:2
Reading message:3
Reading message:4
Stored message 1 in DB + ack
Reading message:5
Stored message 2 in DB + ack
Stored message 5 in DB + ack
Stored message 3 in DB + ack
Stored message 4 in DB + ack

如果出现错误,我正在考虑将记录发布到 DLT。

我也尝试过使用 flatMap,但似乎整个处理在单个线程上按顺序发生。此外,如果我要发布到新的调度程序,则处理会在新的单个线程上进行。 如果我问的是可能的,有人可以帮我提供代码片段吗?

spring-webflux project-reactor reactor-kafka
1个回答
0
投票

Reactor Kafka 建立在

KafkaConsumer
API 之上,轮询周期与处理逻辑分离,并在需要时使用 backpresure 和暂停消费者。默认情况下,
KafkaReceiver
Schedulers.single
线程上发布获取的记录。

现在,根据您的逻辑,您可以顺序或并行处理数据和提交偏移量。对于并发处理,您可以使用

flatMap
默认情况下,同时处理
Queues.SMALL_BUFFER_SIZE = 256
消息。

您可以控制并发性

flatMap(item -> process(item), concurrency)
或使用
concatMap
运算符如果你想顺序处理。检查 flatMap(..., int concurrency, int prefetch) 了解详情。

kafkaReceiver.receive()
    .flatMap(rec -> process(rec), concurrency)

Kafka 有序处理与无序处理

根据用例,有多种选择。

有序消息处理

如果消息顺序很重要,消息的处理顺序应与生产者发送的顺序相同。 Kafka 保证每个分区的消息顺序。

在 Reactior Kafka 中,您可以通过对每个分区的数据进行分组然后按顺序处理

kafkaReceiver.receive()
        .groupBy(message -> message.receiverOffset().topicPartition())
        .flatMap(partitions -> partitions.concatMap(this::process));

无序消息处理

如果顺序不重要并且消息可以按任何顺序处理,我们可以通过并行处理多条消息来提高吞吐量。

kafkaReceiver.receive()
        .flatMap(message -> process(message), concurrency);

Reactor kafka 支持 乱序提交 并且框架将根据需要推迟提交,直到填补任何“空白”。这消除了应用程序跟踪偏移量并按正确顺序提交它们的需要。

一般来说,无序消息处理将在少量分区上支持更高的吞吐量。对于有序消息处理,您需要增加分区数以增加吞吐量。您可以在这里找到一些额外的注意事项如何提高消息吞吐量

重要的是要注意(@PatPanda 感谢更正)如果你没有任何异步逻辑(延迟、http 调用或任何其他 I/O 绑定反应代码),订阅和执行将在同一个线程上继续. 要切换到另一个

Scheduler
,我们需要添加
.subscribeOn
(例如
.subscribeOn(Schedulers.parallel())
或.subscribeOn(Schedulers.boundedElastic())
) to the 
process` 函数。

kafkaReceiver.receive()
    .flatMap(rec -> process(rec).subscribeOn(Schedulers.parallel()))

如果添加日志记录,您会看到所有记录都在

kafka-receiver-2
上接收,但在不同的
parallel-#
线程上处理。请注意,记录是按分区顺序接收的。

12:50:08.347  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-2, partition: 0
12:50:08.349  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-3, partition: 0
12:50:08.350  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-4, partition: 0
12:50:08.350  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-6, partition: 0
12:50:08.351  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-9, partition: 0
12:50:08.353  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-0, partition: 2
12:50:08.354  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-8, partition: 2
12:50:08.355  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-1, partition: 1
12:50:08.356  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-5, partition: 1
12:50:08.358  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-7, partition: 1
12:50:09.353  [parallel-3] INFO [c.e.d.KafkaConsumerTest] - process: value-2, partition: 0
12:50:09.353  [parallel-6] INFO [c.e.d.KafkaConsumerTest] - process: value-6, partition: 0
12:50:09.353  [parallel-4] INFO [c.e.d.KafkaConsumerTest] - process: value-3, partition: 0
12:50:09.353  [parallel-5] INFO [c.e.d.KafkaConsumerTest] - process: value-4, partition: 0
12:50:09.355  [parallel-7] INFO [c.e.d.KafkaConsumerTest] - process: value-9, partition: 0
12:50:09.360  [parallel-10] INFO [c.e.d.KafkaConsumerTest] - process: value-1, partition: 1
12:50:09.360  [parallel-9] INFO [c.e.d.KafkaConsumerTest] - process: value-8, partition: 2
12:50:09.360  [parallel-8] INFO [c.e.d.KafkaConsumerTest] - process: value-0, partition: 2
12:50:09.361  [parallel-11] INFO [c.e.d.KafkaConsumerTest] - process: value-5, partition: 1
12:50:09.361  [parallel-12] INFO [c.e.d.KafkaConsumerTest] - process: value-7, partition: 1
© www.soinside.com 2019 - 2024. All rights reserved.