Spring反应流-意外关闭

问题描述 投票:0回答:1

我们正在通过RabbitMQ使用Spring Cloud反应流。

Spring Reactive Stream似乎在将消息从队列中拉出后立即对其进行确认。因此,在消息处理期间发生的任何错误未处理的异常都需要在应用程序中进行处理(这与非响应流不同,在非响应流中,可能会抛出未处理的异常并拒绝一条消息,从而将其发送到死信队列中)。

在传送消息时,我们应该如何处理应用程序中的突然关闭?

例如:

  • 应用程序将消息从队列中拉出
  • 应用程序将消息标记为已确认
  • 应用程序开始消息处理
  • 在完成消息处理之前关闭应用程序

发生这种情况时,该消息似乎已完全丢失,因为它不在队列中,但应用程序已停止。我们如何恢复这些消息?

spring spring-cloud-stream spring-rabbitmq reactive-streams spring-reactive
1个回答
1
投票

您需要使用手动确认并推迟确认,直到异步完成处理。为此,您需要使用整个消息:

    @Bean
    public Consumer<Flux<Message<String>>> async() {
        return inbound -> inbound

                ...

                .map(msg -> {
                    try {
                        msg.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class)
                                .basicAck(msg.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), false);
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                    return msg.getPayload();
                })
                .subscribe(System.out::println);
    }
spring:
  cloud:
    stream:
      function.definition: async
      bindings:
        async-in-0:
          destination: testtock
          group: async
      rabbit:
        bindings:
          async-in-0:
            consumer:
              acknowledge-mode: MANUAL
              prefetch: 10

使用basicReject重新排队或发送到DLQ。

© www.soinside.com 2019 - 2024. All rights reserved.