拥有Alpakka的无限AMQP消费者

问题描述 投票:0回答:1

我正在尝试使用Alpakka实现与AMQP代理连接的非常简单的服务。我只是希望它在被推送到给定的交换/主题时将消息从队列中作为流消费。

在我的测试中,一切似乎都运行良好,但当我尝试启动我的服务时,我意识到我的流只消耗了我的消息一次然后退出。

基本上我使用的是Alpakka文档中的代码:

def consume()={
    val amqpSource = AmqpSource.committableSource(
      TemporaryQueueSourceSettings(connectionProvider, exchangeName)
        .withDeclaration(exchangeDeclaration)
        .withRoutingKey(topic),
      bufferSize = prefetchCount
    )

    val amqpSink = AmqpSink.replyTo(AmqpReplyToSinkSettings(connectionProvider))

    amqpSource.mapAsync(4)(msg => onMessage(msg)).runWith(amqpSink)
}

我试图每秒安排consume()执行,但我遇到了OutOfMemoryException问题。

有没有正确的方法使这段代码作为无限循环运行?

akka-stream alpakka
1个回答
1
投票

如果你想在失败或被取消时重新启动Source,请用RestartSource.withBackoff包装它。

© www.soinside.com 2019 - 2024. All rights reserved.