我有一个带有
StompInboundChannelAdapter
的 Spring 集成流程,用于侦听来自一个目的地的传入消息,还有另一个带有 StompMessageHandler
的集成流程,用于将消息发送到不同的目的地。我可以为两者使用相同的 StompSessionManager
实例吗?或者每个流应该有自己的实例? STOMP 服务器是相同的。
我尝试使用具有以下配置的单例实例,它似乎可以工作,但我不知道这是否是正确的方法,或者我错过了一些东西:
@Configuration
public class StompSessionManagerConfiguration {
@Value("${host}")
private String host;
@Value("${port}")
private Integer port;
@Value("${login}")
private String login;
@Value("${passcode}")
private String passcode;
@Bean
public StompSessionManager stompSessionManager() {
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient(host, port);
ReactorNettyTcpStompSessionManager stompSessionManager =
new ReactorNettyTcpStompSessionManager(stompClient);
stompSessionManager.setConnectHeaders(connectHeaders());
return stompSessionManager;
}
public StompHeaders connectHeaders() {
StompHeaders connectHeaders = new StompHeaders();
connectHeaders.setLogin(login);
connectHeaders.setPasscode(passcode);
return connectHeaders;
}
}
@Configuration
public class IncomingMessageFlowConfiguration {
@Autowired
private StompSessionManager stompSessionManager;
@Bean
public IntegrationFlow incomingMessageFlow() {
return IntegrationFlow.from(stompInboundChannelAdapter())
.channel("incomingMessageChannel").get();
}
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(stompSessionManager, "incomingDestination");
adapter.setPayloadType(byte[].class);
return adapter;
}
}
@Configuration
public class OutgoingMessageFlowConfiguration {
@Autowired
private StompSessionManager stompSessionManager;
@Bean
public IntegrationFlow outgoingMessageFlow() {
return IntegrationFlow.from("outgoingMessageChannel")
.handle(stompMessageHandler()).get();
}
public StompMessageHandler stompMessageHandler() {
StompMessageHandler stompMessageHandler = new StompMessageHandler(stompSessionManager);
stompMessageHandler.setDestination("outgoingDestination");
return stompMessageHandler;
}
}
是的。您可以在不同端点之间共享
StompSessionManager
。它的目标是管理连接到 STOMP 代理的单个客户端会话。
我们甚至在我们分享的项目中进行了集成测试
StompSessionManager
:
@Bean
public StompSessionManager stompSessionManager() {
AbstractStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient);
stompSessionManager.setAutoReceipt(true);
stompSessionManager.setRecoveryInterval(500);
return stompSessionManager;
}
@Bean
public PollableChannel stompInputChannel() {
return new QueueChannel();
}
@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
adapter.setOutputChannel(stompInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public MessageHandler stompMessageHandler() {
StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
handler.setDestination("/topic/myTopic");
handler.setConnectTimeout(1000);
return handler;
}