我正在尝试使用Alpakka实现与AMQP代理连接的非常简单的服务。我只是希望它在被推送到给定的交换/主题时将消息从队列中作为流消费。
在我的测试中,一切似乎都运行良好,但当我尝试启动我的服务时,我意识到我的流只消耗了我的消息一次然后退出。
基本上我使用的是Alpakka文档中的代码:
def consume()={
val amqpSource = AmqpSource.committableSource(
TemporaryQueueSourceSettings(connectionProvider, exchangeName)
.withDeclaration(exchangeDeclaration)
.withRoutingKey(topic),
bufferSize = prefetchCount
)
val amqpSink = AmqpSink.replyTo(AmqpReplyToSinkSettings(connectionProvider))
amqpSource.mapAsync(4)(msg => onMessage(msg)).runWith(amqpSink)
}
我试图每秒安排consume()
执行,但我遇到了OutOfMemoryException
问题。
有没有正确的方法使这段代码作为无限循环运行?
如果你想在失败或被取消时重新启动Source
,请用RestartSource.withBackoff
包装它。