如何在akka流中抛出异常?

问题描述 投票:1回答:1

我想抛出一个异常,如下所示:

  Source.empty
      .map {
        throw new RuntimeException("Failed")
      }
      .runWith(Sink.foreach(println))
      .onComplete {
        case Success(_) =>
          println()
        case Failure(e) =>
          println(s"Thrown ${e.getMessage}")
      }  

onComplete方法中没有出现异常。它打印

Exception in thread "main" java.lang.RuntimeException: Failed
    at com.sweetsoft.App$.main(App.scala:30)
    at com.sweetsoft.App.main(App.scala) 

如何抛出异常,将停止流并出现在最后?

scala akka akka-stream
1个回答
2
投票

Akka已经构建了错误处理:Akka Supervision Strategies

val testSupervisionDecider: Supervision.Decider = {
        case ex: java.lang.RuntimeException =>
          println(s"some run time exception ${ex.getMessage}")
          Supervision.Stop
        case ex: Exception =>
          println("Exception occurred and stopping stream",
            ex)
          Supervision.Stop
      }

你可以使用监督决策者

val list = List.range(1, 100)

  Source(list).map { item =>
    if ((item % 2) == 0) {
      throw new RuntimeException(s"$item")
    } else {
      item
    }
  }.withAttributes(ActorAttributes.supervisionStrategy(testSupervisionDecider))
    .runWith(Sink.foreach(println)).onComplete {
    case Success(_) =>
      println()
    case Failure(e) =>
      println(s"Thrown ${e.getMessage}")
  }
© www.soinside.com 2019 - 2024. All rights reserved.