我正在使用 spring reactive kafka 创建消费者,但是当出现异常时重试逻辑不起作用。
@EventListener(ApplicationReadyEvnt.class)
public void consume(){
reactiveKafkaConsumerTemplate.reciveAutoAck()
.map(ConsumerRecord::value)
.flatMap(record->consumeException(record)) //this method throws an exception
.doOnError(err->log.error("Something went wrong : {} ",err))
.retrywhen(Retry.max(3).transientErrors(true)).retry()
.subscribe();
有没有一种方法我们可以做类似于@retryable 的事情,因为它会重试实际主题并重试 retryTopic,然后将其推送 DLT?
reactive kafka 是从所有分区中提取消息,还是创建单个消费者来轮询来自单个分区的消息,如果是这样,我们可以像在传统 kafka 中那样通过设置并发性来创建多个消费者线程吗?
我尝试记录代码,但仍然不明白为什么它不起作用