我正在尝试通过服务器发送的事件通知所有用户该事件,但不知道为什么但不起作用。如果我使用队列,我能够通知单个用户,但是如果我更改配置以使用主题,它什么也不会做。您在下面看到的我正在使用队列并在jmsTopicTemplate中注释掉template.setPubSubDomain(true),然后将其发送给单个用户
@Bean
public Publisher<Message<LocationData>> jmsReactiveSource(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(JMSConfig.Location_TOPIC).
jmsMessageConverter(messageConverter()))
//.channel(new PublishSubscribeChannel(executor()))
// .channel(MessageChannels.flux())
.channel(MessageChannels.queue())
.toReactivePublisher();
}
您可以在https://github.com/haiderali22/spring-tracking-jms-sse-mongo-app处检查代码
很遗憾,我已经检查了您的项目,该项目仍然足够大,可以正常消化。但是从集成角度看,我应该是这样的:
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(JMSConfig.Location_TOPIC).
jmsMessageConverter(messageConverter())
.autoStartup(false)
.id("jmsMessageDrivenChannelAdapter"))
.toReactivePublisher();
toReactivePublisher()
已经自行注入通道。中间不需要任何其他操作。 autoStartup(false)
用于推迟订阅最终的Flux
。这样,直到最后Flux
发生订阅,您才可以从JMS中提取消息。
您稍后将在.id("jmsMessageDrivenChannelAdapter"))
中使用的LocationService
:
public Flux<LocationData> watch() {
return Flux.from(jmsReactiveSource)
.map(Message::getPayload)
.doOnSubscribe(s -> jmsMessageDrivenChannelAdapter.start());
}
这样,直到真正的订阅发生在Flux
中,您才可以从JMS开始提取。
JMS主题与此SSE主题无关。
如果可以使您的项目更简单,我会再次尝试。我对龙目岛不熟悉,但是...