将消息流式传输到 WebSockets

问题描述 投票:0回答:1

我有一个 websocket 端点,允许客户端订阅与特定对话相关的消息。由于在任何给定时间都有多个服务实例,因此消息保存在 MongoDb 集合中。因此,每个 Web 套接字会话都需要启动一个轮询器,仅查询与其相关的消息。

多个 Websocket 会话也可能对同一对话感兴趣。

Websocket 客户端预计在握手期间提供对话 ID 查询参数(尽管我认为这可能会成为 URL 的一部分):

ws://my.api.com/wsflow/websocket?conversationId=12345

我创建了一个

ServerWebSocketContainer
并设置了一个
HandshakeInterceptor
来获取conversationId查询参数并将其保存为属性。然后,我设置一个 WebSocketHandlerDecorator 并覆盖
afterConnectionEstablished()
以创建一个轮询与该对话 ID 相关的消息的流。对于每条消息,它都会使用 WS 会话的 id 来丰富
sessionId
标头。然后它将其发送到另一个通道,该通道的另一端有一个 WebSocketOutboundMessageHandler。

为了防止重复消息并允许多个会话订阅同一对话,我在文档中包含了一个“sessions”数组。因此,轮询器查询将搜索具有所提供的会话 id 的消息,并且当前的 ws 会话 id 不在已经看到该消息的会话数组中。

这意味着入站适配器还必须有一个更新语句,该语句执行

push
操作,将当前会话 ID 附加到有效负载中的数组中。这感觉有点特殊,但是我想不出另一种不会增加太多复杂性的方法。

代码如下所示。它似乎有效(至少在本地),但我不确定是否有更自然的方法来做到这一点。我还没有确定我是否真的需要包含

useFlowIdAsPrefix()
以及这是否会对
afterConnectionClosed()
调用产生影响,该调用只是执行
flowContext.remove()

@Bean
public ServerWebSocketContainer serverContainer(MongoTemplate mongoTemplate, IntegrationFlowContext flowContext) {
    ServerWebSocketContainer container = new ServerWebSocketContainer("/wsflow").withSockJs(
        new SockJsServiceOptions().setHeartbeatTime(5000L).setTaskScheduler(new TaskSchedulerBuilder().build()));
    container.setInterceptors(new HandshakeInterceptor() {
        @Override
        public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
                WebSocketHandler wsHandler, Exception exception) {}
        @Override
        public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
                WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
            String conversationId = ((ServletServerHttpRequest)request).getServletRequest().getParameter("conversationId");
            attributes.put("conversationId", conversationId);       
            return true;
        }
    });
    container.setDecoratorFactories(handler -> new WebSocketHandlerDecorator(handler) {
        @Override
        public void afterConnectionEstablished(WebSocketSession session) throws Exception {
            super.afterConnectionEstablished(session);
            String conversationId = session.getAttributes().get("conversationId").toString();
            IntegrationFlow f = IntegrationFlow.from(MongoDb.inboundChannelAdapter(mongoTemplate.getMongoDatabaseFactory(), 
                Query.query(Criteria.where("conversationId").is(conversationId).and("sessions").nin(session.getId())))
                .collectionName("test").entityClass(Document.class)
                    .update(new Update().push("sessions", session.getId())), 
                        p -> p.poller(pm -> pm.fixedDelay(1000L)))
            .split()
            .enrichHeaders(s -> s.header(SimpMessageHeaderAccessor.SESSION_ID_HEADER, session.getId()))
            .channel("wsMessages")
            .get();
            flowContext.registration(f).id(session.getId()).useFlowIdAsPrefix().register();
        }
        @Override
        public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
            super.afterConnectionClosed(session, closeStatus);
            flowContext.remove(session.getId());
        }
    });
    return container;
}

@Bean
public MessageHandler webSocketOutboundAdapter(ServerWebSocketContainer container) {
    return new WebSocketOutboundMessageHandler(container);  
}

@Bean
public IntegrationFlow wsOut(MessageHandler webSocketOutboundAdapter) {
    return IntegrationFlow.from("wsMessages")
        .handle(webSocketOutboundAdapter).get();
}

@Bean 
public QueueChannelSpec wsMessages() {
    return MessageChannels.queue();
}
mongodb spring-boot websocket spring-integration spring-integration-ws
1个回答
0
投票

既然你谈论持久性和不重复,我不认为你关于“已见过的会话”的想法有那么糟糕。

我想不出比你已经拥有的更好的解决方案。 因此,MongoDB 中有一个文档,它属于某个会话,并且具有已看到该消息的会话列表的属性。因此,您已经对其进行了足够的优化,不会从数据库中提取那些已处理的消息。即使重新启动后,你仍然会很好。但是,您可能还需要考虑从该列表中删除已关闭的会话。我不确定来自同一客户端的新会话是否会具有相同的

id
来永远保留它。

© www.soinside.com 2019 - 2024. All rights reserved.