如何处理JmsChannelFactoryBean错误,是否有可能使用自定义错误通道?

问题描述 投票:1回答:1

我具有用于创建两个通道的以下配置(通过使用JmsChannelFactoryBean):

@Bean
public JmsChannelFactoryBean jmsChannel(ActiveMQConnectionFactory activeMQConnectionFactory) {
    JmsChannelFactoryBean fb = new JmsChannelFactoryBean(true);
    fb.setConnectionFactory(activeMQConnectionFactory);
    fb.setDestinationName("something.queue");
    fb.setErrorHandler(t -> log.error("something went wrong on jms channel", t));
    return fb;
}

@Bean
public JmsChannelFactoryBean jmsChannelDLQ(ActiveMQConnectionFactory activeMQConnectionFactory) {
    JmsChannelFactoryBean fb = new JmsChannelFactoryBean(true);
    fb.setConnectionFactory(activeMQConnectionFactory);
    fb.setDestinationName("something.queue.DLQ");
    fb.setErrorHandler(t -> log.error("something went wrong on jms channel", t));
    return fb;
}

something.queue配置为将死信放在something.queue.DLQ上。我主要使用Java DSL来配置应用程序,如果可能的话,我希望保留此设置。

情况是:消息是从jmsChannel提取的,并放在sftp出站网关中,如果在发送文件时出现问题,则消息将被重新发送到jmsChannel中,而不进行发送。重试后,它被设计为有毒的,并放入东西。queue.DLQ。

  1. 发生这种情况时是否有可能在错误通道上获得信息?
  2. 使用JMS支持的消息通道时处理错误的最佳实践是什么?

编辑2

集成流程定义为:

IntegrationFlows.from(filesToProcessChannel).handle(outboundGateway)

其中filesToProcessChannel是JMS支持的通道,出站网关定义为:

@Bean
public SftpOutboundGateway outboundGateway(SftpRemoteFileTemplate sftpRemoteFileTemplate) {
    SftpOutboundGateway gateway = new SftpOutboundGateway(sftpRemoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.PUT.getCommand(), EXPRESSION_PAYLOAD);
    ArrayList<Advice> adviceChain = new ArrayList<>();
    adviceChain.add(errorHandlingAdvice());
    gateway.setAdviceChain(adviceChain);
    return gateway;
}

我正在尝试使用建议来捕获异常:

@Bean
public Advice errorHandlingAdvice() {
    RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
    RetryTemplate retryTemplate = new RetryTemplate();
    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(1);
    retryTemplate.setRetryPolicy(retryPolicy);
    advice.setRetryTemplate(retryTemplate);
    advice.setRecoveryCallback(new ErrorMessageSendingRecoverer(filesToProcessErrorChannel));
    return advice;
}

这是正确的方法吗?

编辑3

[SFTPOutboundGateway和建议(或我:/)确实存在问题:我使用了Spring集成参考中的以下建议:

@Bean
public Advice expressionAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setSuccessChannelName("success.input");
    advice.setOnSuccessExpressionString("payload + ' was successful'");
    advice.setFailureChannelName("failure.input");
    advice.setOnFailureExpressionString(
            "payload + ' was bad, with reason: ' + #exception.cause.message");
    advice.setTrapException(true);
    return advice;
}

@Bean
public IntegrationFlow success() {
    return f -> f.handle(System.out::println);
}

@Bean
public IntegrationFlow failure() {
    return f -> f.handle(System.out::println);
}

以及当我使用:

return IntegrationFlows.from(filesToProcessChannel)
            .handle((GenericHandler<File>) (payload, headers) -> {
                if (payload.equals("x")) {
                    return null;
                }
                else {
                    throw new RuntimeException("some failure");
                }
            }, spec -> spec.advice(expressionAdvice()))

它被调用,并且我得到打印出来的错误消息(这是预期的),但是当我尝试使用时:

return IntegrationFlows.from(filesToProcessChannel)
            .handle(outboundGateway, spec -> spec.advice(expressionAdvice()))

未调用建议,并且错误消息被返回给JMS。

该应用程序正在使用Spring Boot v2.0.0.RELEASE,Spring v5.0.4.RELEASE。

编辑4

我设法使用以下配置解决了建议问题,但仍然不明白为什么处理程序规范无法正常工作:

@Bean
IntegrationFlow files(SftpOutboundGateway outboundGateway,
                      ...
) {
    return IntegrationFlows.from(filesToProcessChannel)
            .handle(outboundGateway)
            ...
            .log(LoggingHandler.Level.INFO)
            .get();
}

@Bean
public SftpOutboundGateway outboundGateway(SftpRemoteFileTemplate sftpRemoteFileTemplate) {
    SftpOutboundGateway gateway = new SftpOutboundGateway(sftpRemoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.PUT.getCommand(), EXPRESSION_PAYLOAD);
    ArrayList<Advice> adviceChain = new ArrayList<>();
    adviceChain.add(expressionAdvice());
    gateway.setAdviceChain(adviceChain);
    return gateway;
}


@Bean
public ExpressionEvaluatingRequestHandlerAdvice expressionAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setSuccessChannelName("success.input");
    advice.setOnSuccessExpressionString("payload + ' was successful'");
    advice.setFailureChannelName("failure.input");
    advice.setOnFailureExpressionString(
            "payload + ' was bad, with reason: ' + #exception.cause.message");
    advice.setTrapException(true);
    return advice;
}

@Bean
public IntegrationFlow success() {
    return f -> f.handle(System.out::println);
}

@Bean
public IntegrationFlow failure() {
    return f -> f.handle(System.out::println);
}
java spring-integration spring-jms
1个回答
1
投票

由于移动到DLQ的操作是由代理执行的,因此应用程序没有机制来记录情况-甚至不知道它发生了。

[您将不得不自己捕获异常,并在尝试几次(JMSXDeliveryCount标头)之后自行使用DLQ来发布消息,而不是使用代理策略。

编辑

Advice添加到.handle()步骤。

.handle(outboundGateway, e -> e.advice(myAdvice))

[myAdvice实现MethodInterceptor的地方。

invoke方法中,失败后,您可以检查传递计数标头,如果超过阈值,则将消息发布到DLQ(例如,将其发送到已预订JMS出站适配器的另一个通道),然后记录错误;如果未超过阈值,则只需返回invocation.proceed()的结果(或重新引发异常)。

这样,您可以控制发布到DLQ而不是由代理执行。您还可以在标头中添加更多信息,例如异常。

EDIT2

您需要这样的东西

public class MyAdvice implements MethodInterceptor {

    @Autowired
    private MessageChannel toJms;

    public Object invoke(MethodInvocation invocation) throws Throwable {
        try {
            return invocation.proceed();
        }
        catch Exception(e) {
            Message<?> message = (Message<?>) invocation.getArguments()[0];
            Integer redeliveries = messasge.getHeader("JMXRedeliveryCount", Integer.class);
            if (redeliveries != null && redeliveries > 3) {
                this.toJms.send(message); // maybe rebuild with additional headers about the error
            }
            else {
                throw e;
            }
        }
    }
}

(应该很近,但是我还没有测试)。假定您的经纪人填充了该标头。

© www.soinside.com 2019 - 2024. All rights reserved.