import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.PollerSpec;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.Message;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.policy.MaxAttemptsRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
@SpringBootConfiguration
@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class, ActiveMQAutoConfiguration.class})
public class SpringIntegrationTestApplication {
public static void main2(String[] args) {
RetryTemplate template = new RetryTemplate();
RetryPolicy policy = new MaxAttemptsRetryPolicy(200);
template.setRetryPolicy(policy);
template.execute(context -> {
System.out.println("Trying");
throw new RuntimeException("problem");
});
}
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext ctx = SpringApplication.run(SpringIntegrationTestApplication.class, args);
Cafe cafe = ctx.getBean(Cafe.class);
Order entirelyGoodOrder = new Order(1);
entirelyGoodOrder.addItem(DrinkType.LATTE, 3, true);
entirelyGoodOrder.addItem(DrinkType.ESPRESSO, 3, true);
entirelyGoodOrder.addItem(DrinkType.MOCHA, 3, true);
cafe.placeOrder(entirelyGoodOrder);
System.out.println("Hit 'Enter' to terminate");
System.in.read();
ctx.close();
}
@MessagingGateway
public interface Cafe {
@Gateway(requestChannel = "orders.input")
void placeOrder(Order order);
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedDelay(1000);
}
@Bean
public IntegrationFlow orders(IntegrationFlow businessLogic, IntegrationFlow errorHandler123, IntegrationFlow publishingGateway) {
return f -> f
.log(LoggingHandler.Level.INFO)
.split(Order.class, Order::getItems)
.gateway(businessLogic, gatewayEndpointSpec -> {
gatewayEndpointSpec.advice(retry());
})
.route(Message.class, message -> message.getPayload() instanceof ExceptionalMessage,
router -> router
.subFlowMapping(true, errorHandler123)
.channelMapping(false, "nullChannel"));
}
public static RequestHandlerRetryAdvice retry() {
RetryTemplate template = new RetryTemplate();
RetryPolicy policy = new MaxAttemptsRetryPolicy(3);
template.setRetryPolicy(policy);
RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
requestHandlerRetryAdvice.setRetryTemplate(template);
requestHandlerRetryAdvice.setRecoveryCallback(context -> new ExceptionalMessage(context.getLastThrowable()));
return requestHandlerRetryAdvice;
}
@Bean
public IntegrationFlow businessLogic() {
return f -> f
.handle((payload, headers) -> {
OrderItem orderItem = (OrderItem) payload;
if (orderItem.getDrinkType() == DrinkType.LATTE) {
String message = String.format("throwing exception from first channel logic orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
System.out.println(message);
throw new RuntimeException("Broken order first channel");
} else {
String message = String.format("Processed inside first channel. orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
System.out.println(message);
return orderItem;
}
})
.channel("publishingChannel");
}
@Bean
public IntegrationFlow publishingGateway() {
return IntegrationFlows.from("publishingChannel")
.filter(source -> source.equals(1), spec -> {
spec.requiresReply(true);
})
.handle((payload, header) -> {
System.out.println("publishingGateway CALLED");
OrderItem orderItem = (OrderItem) payload;
if (orderItem.getDrinkType() == DrinkType.ESPRESSO) {
String message = String.format("throwing exception from publishingGateway channel logic orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
System.out.println(message);
throw new RuntimeException("Broken order publishingGateway channel");
} else {
String message = String.format("Processed inside publishingGateway channel. orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
System.out.println(message);
return orderItem;
}
})
.get();
}
@Bean
public IntegrationFlow errorHandler123() {
return f -> f
.log(LoggingHandler.Level.ERROR, message -> String.format("Message being dropped. %s", message.getPayload()));
}
}
在没有 requireReply(true) 的情况下添加过滤器后,在发布网关内部,管道开始无限等待。
如果我删除过滤器,它会按预期工作。
尝试添加spec.requiresReply(true);但它会抛出错误 ReplyRequiredException:处理程序未生成回复
消息被丢弃。 ExceptionalMessage[exceptionMessage=org.springframework.messaging.MessagingException:处理失败;嵌套异常是 org.springframework.integration.handler.ReplyRequiredException:处理程序“publishingGateway.org.springframework.integration.config.ConsumerEndpointFactoryBean#0”没有生成回复,并且其“requiresReply”属性设置为 true。,
默认情况下,网关会无限期等待回复;如果您过滤掉该请求,将不会得到回复。
设置网关上的
replyTimeout
属性;只要您使用默认(直接)通道(因此所有内容都在调用线程上运行),您就可以安全地将超时设置为 0。