ReactDeserializationException发生时如何移动到下一个偏移量reactor-kafka接收器?

问题描述 投票:0回答:1

spring-webflux
reactor-kafka
接收器一起使用时,当
RecordDeserializationException
发生时,如何手动移动/提交偏移?从
RecordDeserializationException
我可以获取分区和偏移量,但我无法手动创建一个
ReceiverOffset
对象来允许我提交(因为它有私有实现)。

   reactiveKafkaReceiver
            .receiveBatch()
            .onErrorResume(e -> {
                RecordDeserializationException rde = (RecordDeserializationException) e;
                TopicPartition topicPartition = rde.topicPartition();
                long offset = rde.offset();
                // how can I commit this offset?
                return Flux.empty();
            })
            .delayUntil(flux -> flux
                    .collectList()
                    .delayUntil(this::process)
                    .doOnNext(records -> records.forEach(record -> record.receiverOffset()
                            .commit()
                            .subscribeOn(Schedulers.boundedElastic())
                            .subscribe())))
            .retryWhen((Retry.backoff(3, Duration.ofSeconds(2)).transientErrors(true)))
            .repeat()
            .subscribe();

这里有什么解决办法吗?

java spring-webflux spring-kafka project-reactor reactor-kafka
1个回答
0
投票

请参阅此答案以获取可能的解决方法:

如何处理(spring)reactor-kafka中的反序列化错误

这可以防止失败,并允许您处理应用程序中的错误,并且偏移量照常提交。

© www.soinside.com 2019 - 2024. All rights reserved.