路由到错误频道,但得到 "调度员没有该频道的用户"。

问题描述 投票:0回答:1

我必须改变Spring Integration(4.3.12, Java DSL)中现有的流程。有一个现有的SOAP调用,之后我必须插入一个新的SOAP调用(已经完成了),如果现有的SOAP调用不成功,那么新的SOAP调用必须跳过(这是我的问题所在)。在下面的流程中 acmePreCompEnricher 是现有的呼叫和 ifMLCallRequiredEnricher 是新的一个。

    return flow -> flow.channel(ORCH_REQUEST_INPUT)
            .<HomeRequest, HomeModel>transform(requestToModelTransformer)
            ...
            // 
            .enrich(this::acmePreCompRequestEnricher)
            .enrich(this::acmePreCompEnricher)
            .handle(this.acmePreCompResponseValidator())
            // 
            .enrich(this::ifMLCallRequiredEnricher)
            // 
            .enrich(this::acmeRequestEnricher)
            .enrich(this::acmeEnricher)
            ...

所以在 acmePreCompEnricher 我设置了处理错误的错误通道。

   ContentEnricher contentEnricher = enricherSpec
            .requestPayload(Message::getPayload)
            .requestChannel(acmePreCompEnrichmentInputChannel())
            .replyChannel(acmePreCompEnrichmentOutputChannel())
            .get();
   contentEnricher.setErrorChannel(skipMLInputChannel());

   @Bean(name = "skip.ml.input")
   public MessageChannel skipMLInputChannel() {
        return MessageChannels.direct().get();
   }

如果发生SOAP故障,信息将被传送到下面的流程中。

@Bean
public IntegrationFlow processSkipML() {
    return flow -> flow.channel("skip.ml.input")
        .transform(ErrorMessage.class, (ErrorMessage m) -> {
                Message originalMessage = ((MessageHandlingException)m.getPayload()).getFailedMessage();
                return MessageBuilder.withPayload(originalMessage.getHeaders().get(HEADER_MODEL, HomeModel.class))
                         .copyHeaders(originalMessage.getHeaders())
                         .build();
                  })
        .enrich(e -> e.propertyFunction("skipMLCall", m -> true))
        .channel("enrich.ifMLCallNeeded.input");
}

后面的... ifMLCallRequiredEnricher 可以发现以下流程。

@Bean
public IntegrationFlow processIfMLCallRequiredFlow() {
    return flow -> flow.channel("enrich.ifMLCallNeeded.input")
            .route(ifMLCallRequired(), routeToMLGatewayOrBypassCall())
            .channel("enrich.ifMLCallNeeded.output");
}

流程如下: ifMLCallRequired() 检查是否 skipMLCall 是false(在出错的情况下,它在错误通道后的流程中被设置为true),它将执行新的SOAP调用,否则它将跳过它。

当没有SOAP故障时,流程会正常进行。

然而,当SOAP故障被抛出时(即消息通过错误通道),我得到以下异常。

2020-05-22 10:10:48,023 ERROR com.acme.webservice.OrchestrationServiceEndpoint Thread=qtp14486859-13 MDC=16d7cc4c-c9da-449b-8bfa-504e6d81185d Error
org.springframework.messaging.MessagingException: failure occurred in error-handling flow; nested exception is org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'enrich.ifMLCallNeeded.output'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=uk.co.acme.payload.request._2017._06.Message@4a5e6c, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@19d4520, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@19d4520, ws_soapAction=http://www.acme.co.uk/XRTEService/ProcessTran, id=902bd270-89d8-62e9-b00f-b69399241bd1, timestamp=1590138648017}], ...}]
    at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:489)
    at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:426)
    at org.springframework.integration.transformer.ContentEnricher$Gateway.sendAndReceiveMessage(ContentEnricher.java:481)
    at org.springframework.integration.transformer.ContentEnricher.handleRequestMessage(ContentEnricher.java:383)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)

所以当没有SOAP故障的时候,一切都很正常,所以这个通道 enrich.ifMLCallNeeded.output 有订阅者,这是下一个充实者,请看下面的日志记录。

2020-05-24 20:37:58,819 INFO  org.springframework.integration.channel.DirectChannel Thread=qtp14486859-10 MDC=16d7cc4c-c9da-449b-8bfa-504e6d81185d Channel 'enrich.ifMLCallNeeded.output' has 1 subscriber(s).

但是当SOAP故障出现时,该通道没有订阅者(我找不到任何日志记录)。我认为这是因为我试图用错误通道劫持流。但在这种情况下,我可以做什么呢?

我希望得到任何帮助,因为我现在被卡住了。非常感谢您的帮助

Regards, V.

spring-integration spring-integration-dsl
1个回答
0
投票

下面是一个如何正确处理丰富子流上的错误的例子。

@SpringBootApplication
public class So61991580Application {

    public static void main(String[] args) {
        SpringApplication.run(So61991580Application.class, args);
    }

    private final AtomicBoolean which = new AtomicBoolean();

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows.from(() -> new Foo(this.which.getAndSet(!which.get()) ? "foo" : "qux"),
                    e -> e.poller(Pollers.fixedDelay(5000)))
                .enrich(spec -> spec.requestChannel("soap1.input")
                        .errorChannel("soap1Error.input"))
                .route("payload.bar", r -> r
                        .channelMapping("good", "soap2.input")
                        .channelMapping("bad", "cleanUp.input"))
                .get();
    }

    @Bean
    public IntegrationFlow soap1() {
        return f -> f
                .handle(Foo.class, (payload, headers) -> {
                    if (payload.getFoo().equals("foo")) {
                        throw new RuntimeException("test enrich failure");
                    }
                    payload.setBar("good");
                    return payload;
                });
    }

    @Bean
    public IntegrationFlow soap2() {
        return f -> f
                .handle(Foo.class, (payload, headers) -> {
                    payload.setBaz("soap2");
                    return payload;
                })
                .channel("cleanUp.input");
    }

    @Bean
    public IntegrationFlow soap1Error() {
        return f -> f.<MessagingException, Foo>transform(ex -> {
            Foo foo = (Foo) ex.getFailedMessage().getPayload();
            foo.setBar("bad");
            return foo;
        });
    }

    @Bean
    public IntegrationFlow cleanUp() {
        return f -> f.log();
    }

    public static class Foo {

        private final String foo;

        private String bar;

        private String baz;

        public Foo(String foo) {
            this.foo = foo;
        }

        public String getFoo() {
            return this.foo;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        public String getBaz() {
            return this.baz;
        }

        public void setBaz(String baz) {
            this.baz = baz;
        }

        @Override
        public String toString() {
            return "Foo [foo=" + this.foo + ", bar=" + this.bar + ", baz=" + this.baz + "]";
        }

    }

}
GenericMessage [payload=Foo [foo=foo, bar=bad, baz=null], headers={id=e5a943c7-dcf1-47f3-436e-5d0350a1c6f5, timestamp=1590511422083}]
GenericMessage [payload=Foo [foo=qux, bar=good, baz=soap2], headers={id=a99d7ddb-2f40-f0f7-08b6-6340563e011d, timestamp=1590511427086}]
2
© www.soinside.com 2019 - 2024. All rights reserved.