我想使用Spring Integration公开一个简单的Web服务,该服务将传入消息推入ActiveMQ并立即响应。我的首选解决方案是MarshallingWebServiceInboundGateway通过IntegrationFlow连接到Jms.outboundAdapter。在Gateway和IntegrationFlow片段下方。问题在于适配器不提供网关期望的响应(duh)。我从服务获得的响应为空202,延迟约为1500ms。这是由我在TRACE日志中看到的回复超时引起的:
"2020-04-14 17:17:50.101 TRACE 26524 --- [nio-8080-exec-6] o.s.integration.core.MessagingTemplate : Failed to receive message from channel 'org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@518ffd27' within timeout: 1000"
在任何地方都没有硬异常。另一个问题是我自己无法生成响应。使用适配器的.handle之后,我无法向IntegrationFlow添加任何内容。
最有可能的正确方法是在两端使用网关,但这是不可能的。我无法等待响应,直到队列中的消息被消耗和处理。
'''
@Bean
public MarshallingWebServiceInboundGateway greetingWebServiceInboundGateway() {
MarshallingWebServiceInboundGateway inboundGateway = new MarshallingWebServiceInboundGateway(
jaxb2Marshaller()
);
inboundGateway.setRequestChannelName("greetingAsync.input");
inboundGateway.setLoggingEnabled(true);
return inboundGateway;
}
@Bean
public IntegrationFlow greetingAsync() {
return f -> f
.log(LoggingHandler.Level.INFO)
.handle(Jms.outboundAdapter(this.jmsConnectionFactory)
.configureJmsTemplate(c -> {
c.jmsMessageConverter(new MarshallingMessageConverter(jaxb2Marshaller()));
})
.destination(JmsConfig.HELLO_WORLD_QUEUE));
}
'''
逻辑和假设完全正确:单向handle()
和类似于Jms.outboundAdapter()
的您不能返回。
但是您的问题是您完全错过了Spring Integration中的头等公民之一-MessageChannel
。重要的是要理解,即使在像您这样的流中,端点之间也存在通道(DSL方法)-隐式(DirectChannel
)(例如您的情况)或显式:当您在两者之间使用channel()
并可以放置在其中时任何可能的实现:https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/dsl.html#java-dsl-channels
当您可以将相同的消息发送到多个订阅的端点时,PublishSubscribeChannel
(JMS规范中的topic
是最重要的通道实现之一:https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/core.html#channel-implementations-publishsubscribechannel
在您的情况下,拳头订户应为您现有的单向Jms.outboundAdapter()
。还有另一件事将产生响应并将其回复到replyChannel
标头中。
为此,Java DSL通过子流配置提供了一个不错的钩子:https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/dsl.html#java-dsl-subflows
因此,一些发布订阅者示例可能是这样的:
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf
.handle(Jms.outboundAdapter(this.jmsConnectionFactory))))
.handle([PRODUCE_RESPONSE])