附加到服务激活器的 Spring Retry Recovery 不是处理链的一部分

问题描述 投票:0回答:1

我正在使用服务激活器的 Spring 集成链来处理来自队列的传入消息。保留传入消息的服务激活器之一

messagePersister
。如果此服务激活器失败,则会出现重试建议,再尝试该操作 3 次。这部分工作正常,但如果所有重试都失败,我们有一个恢复方法,可以以备用形式保留消息(还会触发一些通知等)。此恢复方法和原始持久化方法返回同一类的对象,然后需要由预处理器(链中的下一个服务激活器)处理。 然而,看起来,使用恢复选项会导致消息离开链,并且恢复服务激活器的返回对象不会沿着链走下去。同样,如果恢复方法抛出异常,它不会转到
redboxExceptionChannel
,这是正在侦听传入队列的适配器的异常。

 <int-jms:message-driven-channel-adapter
    id="inputChannelAdapter"
    connection-factory="jmsConnectionFactory"
    destination-name="REDBOX_IN"
    channel="redboxIncomingChannel"
    max-concurrent-consumers="1"
    auto-startup="true"
    acknowledge="transacted"
    receive-timeout="20000"
    error-channel="redboxExceptionChannel"/>

  <int:chain id="redboxIncomingChannelProcessingChain"
    input-channel="redboxIncomingChannel"
    output-channel="redboxOutgoingMessageChannel">
    <int:service-activator ref="messagePersister"
      method="persistAndAddClientMessageIdToHeader">
      <int:request-handler-advice-chain>
        <int:retry-advice max-attempts="4" recovery-channel="persistenceRetriesExhaustedChannel" >
          <int:exponential-back-off initial="800"
            multiplier="3"
            maximum="25000"/>
        </int:retry-advice>
      </int:request-handler-advice-chain>
    </int:service-activator>
    <int:service-activator ref="redboxPreProcessor" method="validate"/>
    <int:service-activator ref="redboxProcessor" method="evaluateRules"/>
  </int:chain>

  <int:service-activator ref="messagePersister"
    method="retriesExhausted" input-channel="persistenceRetriesExhaustedChannel"  />

我期望恢复方法成为触发重试的链的一部分。

spring-integration spring-jms spring-retry
1个回答
0
投票

行为是正确的。

ErrorMessageSendingRecoverer
的逻辑是这样的:

@Override
public Object recover(RetryContext context) {
    publish(context.getLastThrowable(), context);
    return null;
}

所以,它就不会返回。此时不知道您的服务激活器正在生成回复。

您可以这样解决问题:

在此时添加一个

<gateway>
,并通过重试将您的服务激活器提取到一个独立组件中,该组件具有输入通道作为上述
request-channel
中的
gateway

然后,您的

messagePersister.retriesExhausted
必须在从此方法返回之前查看
MessagingException.failedMessage
来复制其标头。这样,
replyChannel
就会出现,端点就会知道将方法的结果发送到哪里。这个
replyChannel
就是那个
gateway
等待回复的地方。这样,您就得到了原始服务激活者的正常回复和
persistenceRetriesExhaustedChannel
订阅者的补偿。

更新

关于恢复器子流程中的错误。 根据我的测试,它按预期工作:

@SpringBootApplication
public class So78089892Application {

    public static void main(String[] args) {
        SpringApplication.run(So78089892Application.class, args);
    }

    @Bean
    ApplicationRunner sendToJms(JmsTemplate jmsTemplate) {
        return args -> jmsTemplate.convertAndSend("REDBOX_IN", "test data");
    }

    @Bean
    JmsMessageDrivenChannelAdapterSpec<?> inputChannelAdapter(ConnectionFactory jmsConnectionFactory) {
        return Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
                .destination("REDBOX_IN")
                .outputChannel("redboxIncomingChannel")
                .errorChannel("redboxExceptionChannel");
    }

    @ServiceActivator(inputChannel = "redboxExceptionChannel")
    void handleErrors(Exception exception) {
        System.out.println("Error Received: \n" + exception);
    }

    @Bean
    IntegrationFlow redboxIncomingChannelProcessingChain(RequestHandlerRetryAdvice retryAdvice) {
        return IntegrationFlow
                .from("redboxIncomingChannel")
                .gateway((subFlow) -> subFlow
                        .handle((p, h) -> {
                            throw new RuntimeException("Persistence failed");
                        }), e -> e.advice(retryAdvice))
                .get();
    }

    @Bean
    RequestHandlerRetryAdvice retryAdvice(MessageChannel persistenceRetriesExhaustedChannel) {
        RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
        requestHandlerRetryAdvice.setRecoveryCallback(
                new ErrorMessageSendingRecoverer(persistenceRetriesExhaustedChannel));
        return requestHandlerRetryAdvice;

    }

    @Bean
    DirectChannel persistenceRetriesExhaustedChannel() {
        return new DirectChannel();
    }

    @ServiceActivator(inputChannel = "persistenceRetriesExhaustedChannel")
    void retriesExhausted(Exception exception) {
        throw new RuntimeException("Cannot recover", exception);
    }

}

正如您在最后一个

retriesExhausted()
方法中看到的,我故意根据来自刚刚失败的处理程序的异常抛出一些异常,并提供重试建议。

最后我从该

handleErrors()
方法中得到了这样的日志:

Error Received: 
org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@53b41cc8], failedMessage=ErrorMessage [payload=org.springframework.messaging.MessagingException: Failed to handle, failedMessage=GenericMessage [payload=test data, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=ActiveMQQueue[REDBOX_IN], id=5ff5cbc6-f585-c113-a9c3-40a741d0cc7f, priority=4, jms_timestamp=1709326985025, jms_messageId=ID:18fa9c02-d80f-11ee-8409-00155d933a76, timestamp=1709326985042}], headers={id=9eb9a300-d058-da1c-8315-432c2ae0cb34, timestamp=1709326985055}]

(对 Java DSL 变体感到抱歉:我有一段时间没有使用 XML 配置了)。

我们的配置可能会有所不同。例如,您的

persistenceRetriesExhaustedChannel
不是
DirectChannel
...

© www.soinside.com 2019 - 2024. All rights reserved.