将
spring-webflux
与 reactor-kafka
接收器一起使用时,当 RecordDeserializationException
发生时,如何手动移动/提交偏移?从 RecordDeserializationException
我可以获取分区和偏移量,但我无法手动创建一个 ReceiverOffset
对象来允许我提交(因为它有私有实现)。
reactiveKafkaReceiver
.receiveBatch()
.onErrorResume(e -> {
RecordDeserializationException rde = (RecordDeserializationException) e;
TopicPartition topicPartition = rde.topicPartition();
long offset = rde.offset();
// how can I commit this offset?
return Flux.empty();
})
.delayUntil(flux -> flux
.collectList()
.delayUntil(this::process)
.doOnNext(records -> records.forEach(record -> record.receiverOffset()
.commit()
.subscribeOn(Schedulers.boundedElastic())
.subscribe())))
.retryWhen((Retry.backoff(3, Duration.ofSeconds(2)).transientErrors(true)))
.repeat()
.subscribe();
这里有什么解决办法吗?