我应该如何测试akka-streams RestartingSource的用法

问题描述 投票:0回答:1

我正在开发一个具有几个长时间运行的流的应用程序,它在其中订阅有关某个实体的数据并处理该数据。这些流应该是24/7,所以我们需要处理故障(网络问题等)。

为此,我们将源包装在RestartingSource中。

我现在正在尝试验证此行为,并且看起来像在起作用,但是我正在努力创建一个测试,在其中输入一些数据,验证它是否正确处理,然后发送错误,并验证它之后重新连接并继续处理。

我将其简化为这种最小的情况:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{RestartSource, Sink, Source}
import akka.stream.testkit.TestPublisher
import org.scalatest.concurrent.Eventually
import org.scalatest.{FlatSpec, Matchers}

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext

class MinimalSpec extends FlatSpec with Matchers with Eventually {

  "restarting a failed source" should "be testable" in {
    implicit val sys: ActorSystem = ActorSystem("akka-grpc-measurements-for-test")
    implicit val mat: ActorMaterializer = ActorMaterializer()
    implicit val ec: ExecutionContext = sys.dispatcher

    val probe = TestPublisher.probe[Int]()
    val restartingSource = RestartSource
      .onFailuresWithBackoff(1 second, 1 minute, 0d) { () => Source.fromPublisher(probe) }

    var last: Int = 0
    val sink = Sink.foreach { l: Int => last = l }

    restartingSource.runWith(sink)

    probe.sendNext(1)

    eventually {
      last shouldBe 1
    }

    probe.sendNext(2)

    eventually {
      last shouldBe 2
    }

    probe.sendError(new RuntimeException("boom"))

    probe.expectSubscription()

    probe.sendNext(3)

    eventually {
      last shouldBe 3
    }

  }
}

此测试在带有eventually的最后一个Last failure message: 2 was not equal to 3块上始终失败。我在这里想念什么?

编辑:akka版本为2.5.31

scala akka akka-stream akka-testkit
1个回答
0
投票

我看了TestPublisher code之后就知道了。它的订阅为lazy val。因此,当RestartSource检测到错误并再次执行工厂方法() => Source.fromPublisher(probe)时,它将获得新的Source,但subscriptionprobe仍指向旧的Source。更改代码以初始化新的SourceTestPublisher均可。

© www.soinside.com 2019 - 2024. All rights reserved.