延迟的入站适配器和控制总线

问题描述 投票:0回答:1

我的集成流程代码是:

@Bean
    public IntegrationFlow messageFlow() {
        return IntegrationFlows.from(stompInboundChannelAdapter())
                .transform(inBoundStompMsgTransformer::transform)
                .headerFilter("stomp_subscription","content-length")
                .handle(Amqp.outboundAdapter(outboundConfiguration.rabbitTemplate()))
                .get();
    }

我正在使用Spring Boot。

清除日志表明已将{transformer}用户添加到输入通道

2019-12-09 18:21:41.752  INFO 18248 --- [           main] o.s.i.s.i.StompInboundChannelAdapter     : started bean 'stompInboundChannelAdapter'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@21e360a'
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {transformer} as a subscriber to the 'stompInputChannel' channel
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.stompInputChannel' has 1 subscriber(s).
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'
2019-12-09 18:21:41.768  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {header-filter} as a subscriber to the 'inboundFlow.channel#0' channel
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.inboundFlow.channel#0' has 1 subscriber(s).
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {amqp:outbound-channel-adapter} as a subscriber to the 'inboundFlow.channel#1' channel
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.inboundFlow.channel#1' has 1 subscriber(s).
2019-12-09 18:21:41.772  INFO 18248 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'inboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'; defined in: 'class path resource [com/mahesh/siuspdemo/adapter/InBoundStompAdapter.class]'; from source: 'bean method inboundFlow'

但是,我遇到异常,我丢失了队列中的前一个/两个消息。它处理剩余的消息。

假设我启动应用程序之前队列中有10条消息。启动应用程序后,即使日志显示已添加订户且bean已启动,但仍收到异常消息,发布异常消息,处理8/9消息。

例外是:org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.stompInputChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage

很明显,上下文还没有完全准备好处理消息,因此是异常。 但是日志消息具有误导性。

我的第一个问题:

  1. 那么,添加订户并启动bean到底意味着什么?这是否意味着已经设置了所有内容,但仍需要准备上下文来处理消息?

为了克服这一点,正如许多帖子中所建议的,我使用控制总线来启动适配器。该代码是:

......
@Component
public class ApplicationLifeCycle implements SmartLifecycle {

    @Autowired
    private MessageChannel controlBusChannel;

    @Override
    public void start() {
        System.out.println("Service starting...");
        controlBusChannel.send(new GenericMessage<>("@stompInboundChannelAdapter.start()"));
    }
.....

我创建了public class ApplicationLifeCycle implements SmartLifecycle,认为会很方便。

我的第二个问题是:

  1. 这是使用控制总线处理适配器启动/停止的正确/最佳方法吗?如果这不是正确的方法,请告诉我正确的方法。

谢谢,

Mahesh

spring-integration spring-integration-dsl
1个回答
0
投票

我认为这是您其他问题的延续:IntegrationFlow Amqp Channel Adapter is not working in handle()

那里有这个:

@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
    StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(stompSessionManager(), "/queue/myQueue");
    adapter.setOutputChannel(stompInputChannel());
    adapter.setPayloadType(ByteString.class);
    return adapter;
}

您不在此处显示。

问题在于,您然后在IntegrationFlow中使用了相同的定义。事实证明,StompInboundChannelAdapter bean较早启动,然后处理了IntegationFlow,并且预订了.transform(inBoundStompMsgTransformer::transform)来处理传入消息。

因此,如果您从@Bean中删除了stompInboundChannelAdapter(),它将为您正常工作。我稍后再看为什么MessageProducerSupport更早启动,然后IntegrationFlow s ...

© www.soinside.com 2019 - 2024. All rights reserved.