Alpakka - 处理消息时的回调

问题描述 投票:0回答:1

如何在每条消息成功发送到JMS或失败后进行回调?

val jmsSink = JmsSink.textSink(
    JmsSinkSettings(connectionFactory).withQueue("My_Queue")
)
Source(Stream.from(1))
    .map(_.toString)
    .runWith(jmsSink)

更具体的例子

// creating a sourceQueue which is bound to jmsSink
val sourceQueue: SourceQueueWithComplete[String] =
    Source.queue[String](bufferSize, OverflowStrategy.backpressure)
        .to(jmsSink)
        .run()

客户正在向sourceQueue发送物品:

val result: Future[QueueOfferResult] = sourceQueue offer "my-item"

val result是将项目插入sourceQueue的结果,但这并不意味着它已被发送到JMS。当项目经过接收进程并插入JMS队列时,我需要触发一个事件。

scala jms akka-stream alpakka
1个回答
1
投票

调用每个成功消息的回调(如果通过“回调”表示返回Unit的函数)的一种方法是创建订阅同一JMS队列的相应Source,并使用runForeach

val jmsSink = JmsSink.textSink(
  JmsSinkSettings(connectionFactory).withQueue("My_Queue")
)

Source(...)
  .map(_.toString)
  .runWith(jmsSink)

val jmsSource = JmsSource(
  JmsSourceSettings(connectionFactory).withQueue("My_Queue")
)

jmsSource.runForeach(println)

上面的示例打印通过接收器发布到队列的每条消息。

至于错误,如果抛出异常,当前您的流将关闭。例如,如果出现抛出异常,你想要打印异常并恢复流而不是终止它,你可以将supervision strategy附加到你原来的Source

val decider: Supervision.Decider = {
  case e: Exception =>
    println(s"Exception thrown: ${e.getMessage}")
    Supervision.Resume
}

val flow = Flow[String]
  .withAttributes(ActorAttributes.supervisionStrategy(decider))

Source(...)
  .map(_.toString)
  .via(flow)
  .runWith(jmsSink)

val jmsSource = ...
© www.soinside.com 2019 - 2024. All rights reserved.