我具有用于创建两个通道的以下配置(通过使用JmsChannelFactoryBean):
@Bean
public JmsChannelFactoryBean jmsChannel(ActiveMQConnectionFactory activeMQConnectionFactory) {
JmsChannelFactoryBean fb = new JmsChannelFactoryBean(true);
fb.setConnectionFactory(activeMQConnectionFactory);
fb.setDestinationName("something.queue");
fb.setErrorHandler(t -> log.error("something went wrong on jms channel", t));
return fb;
}
@Bean
public JmsChannelFactoryBean jmsChannelDLQ(ActiveMQConnectionFactory activeMQConnectionFactory) {
JmsChannelFactoryBean fb = new JmsChannelFactoryBean(true);
fb.setConnectionFactory(activeMQConnectionFactory);
fb.setDestinationName("something.queue.DLQ");
fb.setErrorHandler(t -> log.error("something went wrong on jms channel", t));
return fb;
}
something.queue配置为将死信放在something.queue.DLQ上。我主要使用Java DSL来配置应用程序,如果可能的话,我希望保留此设置。
情况是:消息是从jmsChannel提取的,并放在sftp出站网关中,如果在发送文件时出现问题,则消息将被重新发送到jmsChannel中,而不进行发送。重试后,它被设计为有毒的,并放入东西。queue.DLQ。
编辑2
集成流程定义为:
IntegrationFlows.from(filesToProcessChannel).handle(outboundGateway)
其中filesToProcessChannel是JMS支持的通道,出站网关定义为:
@Bean
public SftpOutboundGateway outboundGateway(SftpRemoteFileTemplate sftpRemoteFileTemplate) {
SftpOutboundGateway gateway = new SftpOutboundGateway(sftpRemoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.PUT.getCommand(), EXPRESSION_PAYLOAD);
ArrayList<Advice> adviceChain = new ArrayList<>();
adviceChain.add(errorHandlingAdvice());
gateway.setAdviceChain(adviceChain);
return gateway;
}
我正在尝试使用建议来捕获异常:
@Bean
public Advice errorHandlingAdvice() {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(1);
retryTemplate.setRetryPolicy(retryPolicy);
advice.setRetryTemplate(retryTemplate);
advice.setRecoveryCallback(new ErrorMessageSendingRecoverer(filesToProcessErrorChannel));
return advice;
}
这是正确的方法吗?
编辑3
[SFTPOutboundGateway和建议(或我:/)确实存在问题:我使用了Spring集成参考中的以下建议:
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
以及当我使用:
return IntegrationFlows.from(filesToProcessChannel)
.handle((GenericHandler<File>) (payload, headers) -> {
if (payload.equals("x")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, spec -> spec.advice(expressionAdvice()))
它被调用,并且我得到打印出来的错误消息(这是预期的),但是当我尝试使用时:
return IntegrationFlows.from(filesToProcessChannel)
.handle(outboundGateway, spec -> spec.advice(expressionAdvice()))
未调用建议,并且错误消息被返回给JMS。
该应用程序正在使用Spring Boot v2.0.0.RELEASE,Spring v5.0.4.RELEASE。
编辑4
我设法使用以下配置解决了建议问题,但仍然不明白为什么处理程序规范无法正常工作:
@Bean
IntegrationFlow files(SftpOutboundGateway outboundGateway,
...
) {
return IntegrationFlows.from(filesToProcessChannel)
.handle(outboundGateway)
...
.log(LoggingHandler.Level.INFO)
.get();
}
@Bean
public SftpOutboundGateway outboundGateway(SftpRemoteFileTemplate sftpRemoteFileTemplate) {
SftpOutboundGateway gateway = new SftpOutboundGateway(sftpRemoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.PUT.getCommand(), EXPRESSION_PAYLOAD);
ArrayList<Advice> adviceChain = new ArrayList<>();
adviceChain.add(expressionAdvice());
gateway.setAdviceChain(adviceChain);
return gateway;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
由于移动到DLQ的操作是由代理执行的,因此应用程序没有机制来记录情况-甚至不知道它发生了。
[您将不得不自己捕获异常,并在尝试几次(JMSXDeliveryCount
标头)之后自行使用DLQ来发布消息,而不是使用代理策略。
编辑
将Advice
添加到.handle()
步骤。
.handle(outboundGateway, e -> e.advice(myAdvice))
[myAdvice
实现MethodInterceptor
的地方。
在invoke
方法中,失败后,您可以检查传递计数标头,如果超过阈值,则将消息发布到DLQ(例如,将其发送到已预订JMS出站适配器的另一个通道),然后记录错误;如果未超过阈值,则只需返回invocation.proceed()
的结果(或重新引发异常)。
这样,您可以控制发布到DLQ而不是由代理执行。您还可以在标头中添加更多信息,例如异常。
EDIT2
您需要这样的东西
public class MyAdvice implements MethodInterceptor {
@Autowired
private MessageChannel toJms;
public Object invoke(MethodInvocation invocation) throws Throwable {
try {
return invocation.proceed();
}
catch Exception(e) {
Message<?> message = (Message<?>) invocation.getArguments()[0];
Integer redeliveries = messasge.getHeader("JMXRedeliveryCount", Integer.class);
if (redeliveries != null && redeliveries > 3) {
this.toJms.send(message); // maybe rebuild with additional headers about the error
}
else {
throw e;
}
}
}
}
(应该很近,但是我还没有测试)。假定您的经纪人填充了该标头。