Spring Integration DSL IntegrationFlow filter() 不返回任何内容并无限等待返回

问题描述 投票:0回答:1
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.PollerSpec;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.Message;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.policy.MaxAttemptsRetryPolicy;
import org.springframework.retry.support.RetryTemplate;


@SpringBootConfiguration
@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class, ActiveMQAutoConfiguration.class})
public class SpringIntegrationTestApplication {

    public static void main2(String[] args) {
        RetryTemplate template = new RetryTemplate();
        RetryPolicy policy = new MaxAttemptsRetryPolicy(200);
        template.setRetryPolicy(policy);
        template.execute(context -> {
            System.out.println("Trying");
            throw new RuntimeException("problem");
        });
    }

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext ctx = SpringApplication.run(SpringIntegrationTestApplication.class, args);

        Cafe cafe = ctx.getBean(Cafe.class);

        Order entirelyGoodOrder = new Order(1);
        entirelyGoodOrder.addItem(DrinkType.LATTE, 3, true);
        entirelyGoodOrder.addItem(DrinkType.ESPRESSO, 3, true);
        entirelyGoodOrder.addItem(DrinkType.MOCHA, 3, true);

        cafe.placeOrder(entirelyGoodOrder);

        System.out.println("Hit 'Enter' to terminate");
        System.in.read();
        ctx.close();
    }

    @MessagingGateway
    public interface Cafe {

        @Gateway(requestChannel = "orders.input")
        void placeOrder(Order order);
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerSpec poller() {
        return Pollers.fixedDelay(1000);
    }

    @Bean
    public IntegrationFlow orders(IntegrationFlow businessLogic, IntegrationFlow errorHandler123, IntegrationFlow publishingGateway) {
        return f -> f
                .log(LoggingHandler.Level.INFO)
                .split(Order.class, Order::getItems)
                .gateway(businessLogic, gatewayEndpointSpec -> {
                    gatewayEndpointSpec.advice(retry());
                })
                .route(Message.class, message -> message.getPayload() instanceof ExceptionalMessage,
                        router -> router
                                .subFlowMapping(true, errorHandler123)
                                .channelMapping(false, "nullChannel"));
    }

    public static RequestHandlerRetryAdvice retry() {
        RetryTemplate template = new RetryTemplate();
        RetryPolicy policy = new MaxAttemptsRetryPolicy(3);
        template.setRetryPolicy(policy);
        RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
        requestHandlerRetryAdvice.setRetryTemplate(template);
        requestHandlerRetryAdvice.setRecoveryCallback(context -> new ExceptionalMessage(context.getLastThrowable()));
        return requestHandlerRetryAdvice;
    }

    @Bean
    public IntegrationFlow businessLogic() {
        return f -> f

                .handle((payload, headers) -> {
                    OrderItem orderItem = (OrderItem) payload;
                    if (orderItem.getDrinkType() == DrinkType.LATTE) {
                        String message = String.format("throwing exception from first channel logic orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
                        System.out.println(message);

                        throw new RuntimeException("Broken order first channel");
                    } else {
                        String message = String.format("Processed inside first channel. orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
                        System.out.println(message);
                        return orderItem;
                    }
                })
                .channel("publishingChannel");
    }

    @Bean
    public IntegrationFlow publishingGateway() {
        return IntegrationFlows.from("publishingChannel")
                .filter(source -> source.equals(1),  spec -> {
                    spec.requiresReply(true);

                })
                .handle((payload, header) -> {
                    System.out.println("publishingGateway CALLED");

                    OrderItem orderItem = (OrderItem) payload;
                    if (orderItem.getDrinkType() == DrinkType.ESPRESSO) {
                        String message = String.format("throwing exception from publishingGateway channel logic orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
                        System.out.println(message);

                        throw new RuntimeException("Broken order publishingGateway channel");
                    } else {
                        String message = String.format("Processed inside publishingGateway channel. orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
                        System.out.println(message);
                        return orderItem;
                    }
                })
                .get();
    }

    @Bean
    public IntegrationFlow errorHandler123() {
        return f -> f
                .log(LoggingHandler.Level.ERROR, message -> String.format("Message being dropped. %s", message.getPayload()));
    }
}

在没有 requireReply(true) 的情况下添加过滤器后,在发布网关内部,管道开始无限等待。

如果我删除过滤器,它会按预期工作。

尝试添加spec.requiresReply(true);但它会抛出错误 ReplyRequiredException:处理程序未生成回复

消息被丢弃。 ExceptionalMessage[exceptionMessage=org.springframework.messaging.MessagingException:处理失败;嵌套异常是 org.springframework.integration.handler.ReplyRequiredException:处理程序“publishingGateway.org.springframework.integration.config.ConsumerEndpointFactoryBean#0”没有生成回复,并且其“requiresReply”属性设置为 true。,

spring spring-integration-dsl
1个回答
0
投票

默认情况下,网关会无限期等待回复;如果您过滤掉该请求,将不会得到回复。

设置网关上的

replyTimeout
属性;只要您使用默认(直接)通道(因此所有内容都在调用线程上运行),您就可以安全地将超时设置为 0。

© www.soinside.com 2019 - 2024. All rights reserved.