无法通过SSE向所有连接发送通知

问题描述 投票:0回答:1

我正在尝试通过服务器发送的事件通知所有用户该事件,但不知道为什么但不起作用。如果我使用队列,我能够通知单个用户,但是如果我更改配置以使用主题,它什么也不会做。您在下面看到的我正在使用队列并在jmsTopicTemplate中注释掉template.setPubSubDomain(true),然后将其发送给单个用户

@Bean
    public Publisher<Message<LocationData>> jmsReactiveSource(ConnectionFactory connectionFactory) {

        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(connectionFactory)
                        .destination(JMSConfig.Location_TOPIC).
                        jmsMessageConverter(messageConverter()))
                //.channel(new PublishSubscribeChannel(executor()))                 
//              .channel(MessageChannels.flux())
                .channel(MessageChannels.queue())
                .toReactivePublisher();
    }

您可以在https://github.com/haiderali22/spring-tracking-jms-sse-mongo-app处检查代码

spring-integration
1个回答
0
投票

很遗憾,我已经检查了您的项目,该项目仍然足够大,可以正常消化。但是从集成角度看,我应该是这样的:

return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory)
                    .destination(JMSConfig.Location_TOPIC).
                            jmsMessageConverter(messageConverter())
                    .autoStartup(false)
                    .id("jmsMessageDrivenChannelAdapter"))
            .toReactivePublisher();

toReactivePublisher()已经自行注入通道。中间不需要任何其他操作。 autoStartup(false)用于推迟订阅最终的Flux。这样,直到最后Flux发生订阅,您才可以从JMS中提取消息。

您稍后将在.id("jmsMessageDrivenChannelAdapter"))中使用的LocationService

public Flux<LocationData> watch() {
    return Flux.from(jmsReactiveSource)
            .map(Message::getPayload)
            .doOnSubscribe(s -> jmsMessageDrivenChannelAdapter.start());
}

这样,直到真正的订阅发生在Flux中,您才可以从JMS开始提取。

JMS主题与此SSE主题无关。

如果可以使您的项目更简单,我会再次尝试。我对龙目岛不熟悉,但是...

© www.soinside.com 2019 - 2024. All rights reserved.