在正确的线程上执行 onErrorResume

问题描述 投票:0回答:0

我正在实施死信逻辑,但在使用

webClient

调用外部服务时发生异常后,无法确认记录
orderConsumer
    .receive()
    .concatMap(
        orderRecord ->
            orderService
                .process(orderRecord) // use webClient to make http calls
                .onErrorMap(
                    throwable -> {
                      if (throwable instanceof NonRetryableExceptionMarker) {
                        return new NonRetryableException(throwable, orderRecord );
                      }
                      return throwable;
                    }))
    .onErrorResume(
        throwable -> {
          if (throwable instanceof NonRetryableException nonRetryableException) {
            deadLetterPublishingRecoverer.accept(
                nonRetryableException.getReceiverRecord(), nonRetryableException);
            nonRetryableException.getReceiverRecord().receiverOffset().acknowledge();
          }
          return Flux.empty();
        })
    .repeat();

消息消费在线程

[reactive-kafka-orderGroup-1]
上开始,调用
orderService.process(orderRecord)
后切换到
[reactor-http-nio-2]
,

当出现错误(404错误)时,它会在同一个线程上继续

[reactor-http-nio-2]
并调用
nonRetryableException.getReceiverRecord().receiverOffset().acknowledge();
但消息未提交,

确认调用必须在

[reactive-kafka-orderGroup-1]
线程上完成,因为我在调用外部服务之前尝试模拟异常并且它按预期工作

P.S:在日志中我可以看到

[reactive-kafka-orderGroup-1]
线程停止(因为
repeat()
)在另一个线程`[reactor-http-nio-2]
上调用
.acknowledge()

之前
java apache-kafka reactive-programming spring-kafka project-reactor
© www.soinside.com 2019 - 2024. All rights reserved.