如何在每条消息成功发送到JMS或失败后进行回调?
val jmsSink = JmsSink.textSink(
JmsSinkSettings(connectionFactory).withQueue("My_Queue")
)
Source(Stream.from(1))
.map(_.toString)
.runWith(jmsSink)
更具体的例子
// creating a sourceQueue which is bound to jmsSink
val sourceQueue: SourceQueueWithComplete[String] =
Source.queue[String](bufferSize, OverflowStrategy.backpressure)
.to(jmsSink)
.run()
客户正在向sourceQueue
发送物品:
val result: Future[QueueOfferResult] = sourceQueue offer "my-item"
val result
是将项目插入sourceQueue
的结果,但这并不意味着它已被发送到JMS。当项目经过接收进程并插入JMS队列时,我需要触发一个事件。
调用每个成功消息的回调(如果通过“回调”表示返回Unit
的函数)的一种方法是创建订阅同一JMS队列的相应Source
,并使用runForeach
:
val jmsSink = JmsSink.textSink(
JmsSinkSettings(connectionFactory).withQueue("My_Queue")
)
Source(...)
.map(_.toString)
.runWith(jmsSink)
val jmsSource = JmsSource(
JmsSourceSettings(connectionFactory).withQueue("My_Queue")
)
jmsSource.runForeach(println)
上面的示例打印通过接收器发布到队列的每条消息。
至于错误,如果抛出异常,当前您的流将关闭。例如,如果出现抛出异常,你想要打印异常并恢复流而不是终止它,你可以将supervision strategy附加到你原来的Source
:
val decider: Supervision.Decider = {
case e: Exception =>
println(s"Exception thrown: ${e.getMessage}")
Supervision.Resume
}
val flow = Flow[String]
.withAttributes(ActorAttributes.supervisionStrategy(decider))
Source(...)
.map(_.toString)
.via(flow)
.runWith(jmsSink)
val jmsSource = ...