Spring Boot RabbitMQ Pub/Sub 模型,具有运行时队列创建/订阅和容器扩展功能

问题描述 投票:0回答:1

我有一个带有基本 WebSocketConfigurer 的 Spring Boot 应用程序。在较高级别上,对于每个已建立的连接,我希望该 websocket 会话“订阅”专用于该会话的“通道”。消费者只需获取订阅中传入的任何消息并将其在 Web 套接字会话上发送即可。

我将“订阅”、“订阅”、“频道”放在引号中,因为它们代表了一个高级发布/订阅概念,而不是特定于实现的。

我选择了 RabbitMQ 作为我的 Pub/Sub 模型。鉴于我们正在使用 Rabbit,“通道”将作为 Rabbit 中的队列存在。查看无数的示例和文档 - 它主要涵盖具有静态的、预定的队列集的场景。在这种情况下,我们可以简单地声明一个 RabbitListener 带注释的方法,并使用代码来处理 anno 中指定的队列上的消息。

在这种情况下,我必须在运行时创建这些订阅。我知道 AmqpAdmin 允许在运行时声明队列和绑定。就连接到队列而言,似乎有兔子“端点”和“侦听器容器”的概念。

在大多数示例中,“端点”隐藏在引擎盖下,我们只需创建一个 DirectMessageListenerContainer (或 SimpleMessageListenerContainer),使用注入的 ConnectionFactory 并将其传递给 setConnectionFactory,然后定义一个 MessageListener 并将其设置在容器上。

这就是我遇到的第一个问题。一个 MessageListenerContainer 只能有一个与其关联的 MessageListener 实例。我可以使用 AmqpAdmin 创建队列,并使用 addQueues() 将这些队列引用添加到容器,但随后添加的队列中的任何一个上的所有消息都将发送到同一个 MessageListener。

我希望有一个 MessageLisener 实例对一个队列感兴趣并引用一个 WebSocketSession。 handle 方法会将队列中的消息简单地推送到该 WebSocket。但是,如果我只能有一个 MessageListener,那么我只能在 MessageListener 上创建某种 ConcurrentHashMap,这样我就可以通过队列名称查找正确的 WebSocketSession(您可以从入站消息中获取队列名称)以便推送到正确的 WebSocketSession。

我的下一个想法是为每个连接的新 WebSocketSession 创建一个新的 MessageListenerContainer 以及一个新的 MessageListener。这将允许我创建一个新的 MessageListener 实例,该实例保存对 WebSocketSession 的引用,并专用于为 WebSocket 创建的一个队列获取。但现在这意味着创建许多容器,我不确定这是否是一个好的做法。可以创建多少个容器?贵吗?

我还没有深入研究 RabbitListenerEndpoint 是什么以及它与容器的关系。这似乎是我不应该直接与之交互的东西,因为它们似乎是由容器或其他东西管理的。但也许我的假设是错误的。

最终我希望为每个进来的 websocket 都有一个专门的“订阅”到“通道”(队列?)。我只是坚持找出正确使用 RabbitMQ 的正确实现细节。

期待有人建议 STOMP:我正在考虑将 Spring Stomp 消息代理与rabbit一起使用作为可能的替代方案。但这是一个单独的问题。

spring spring-boot rabbitmq spring-amqp spring-websocket
1个回答
0
投票

RabbitListenerEndpoint
只是基础设施用来为
@RabbitListener
方法创建消息侦听器适配器的工具。

您可以使该 bean 成为原型 bean(而不是默认的单例),然后从应用程序上下文中检索到的每个 bean 都会获得自己的侦听器容器。

但是,您是正确的,创建许多容器并不是一个好的做法。因此,在单个监听器内进行路由可能是这种情况的最佳方法。但是,您不需要为每个订阅者建立一个队列(除非订阅必须是持久的,并且能够在订阅者断开连接时获取发送的消息)。

© www.soinside.com 2019 - 2024. All rights reserved.