Akka Streams KillSwitch in alpaca jms

问题描述 投票:1回答:1

我有一个场景,我使用alpakka启动多个jmsSource(针对不同的队列)。我还需要在任何时候分离队列。所以我将KillSwitch添加到jms akka流中,如下所示: -

trait MessageListener  {

  lazy val jmsPipeline = jmsSource
    .map { x => log.info(s"Received message ${x} from ${queue}"); x }
    .viaMat(KillSwitches.single)(Keep.right)
    .toMat(Sink.foreach { x => pipelineActorRef ! PreProcessorMessage(x) })
    (Keep.both)
    .run()

   def start(): Unit = {
             log.info("Invoking listener : {}", queue)
             jmsPipeline
             log.info("listener : {} started", queue)
          }
  def stop():Unit  =     jmsPipeline._1.shutdown()

  def queue: String

}

object ListenerA extends MessageListener {
  override def queue: String = "Queue_A"
}

object ListenerB extends MessageListener {
  override def queue: String = "Queue_B"
} 

.. 等等。

启动应用程序后,所有队列都已连接并正常工作。但是当我尝试使用stop方法分离队列时,并非所有队列都断开连接并且行为是随机的。我还检查了所有听众的killSwitch是不同的。

有人可以告诉我这里出了什么问题吗?

scala akka-stream alpakka
1个回答
0
投票

您的日志支持您使用不同的流连接到多个队列的错觉,但您有多个可能连接到同一队列的流。在两个侦听器对象中,记录器记录重写的queue名称,但此队列名称不用于配置jmsSource

你没有显示jmsSource的定义;显然它被定义在MessageListener特征之外的某个地方,在这种情况下,ListenerAListenerB都使用相同的jmsSource。换句话说,虽然ListenerAListenerB有不同的jmsPipeline实例(这就是为什么杀伤开关不同),这两个jmsPipeline实例都来自相同的jmsSource实例(除非jmsSourcedef,在每次调用时创建不同的Source ,但即使是这种情况,基本问题仍然存在:queue未在配置中使用)。

在Alpakka中,JMS队列在JmsSourceSettings上配置,因此jmsSource可能如下所示:

val jmsSource: Source[String, NotUsed] = JmsSource.textSource(
  JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("MyQueue")
)                        // the queue is configured here ^

例如,当调用ListenerA.start()时,将记录以下内容:

Invoking listener : Queue_A
listener : Queue_A started

同样,上述日志语句中的"Queue_A"def queue: String中被覆盖的ListenerA成员的值;它不一定是在jmsSource中实际配置的队列(上例中的"MyQueue")。与ListenerB和你在map组合中登录的消息一样。

一个简单的解决方法是将jmsSource及其JmsSourceSettings的定义移到MessageListener特性中,并在这些设置中实际使用queue

© www.soinside.com 2019 - 2024. All rights reserved.