Akka Streams RestartSource.onFailuresWithBackoff 停止条件

问题描述 投票:0回答:2

如果出现异常,我正在使用 RestartSource.onFailuresWithBackoff 重新启动源,但如果收到某种异常类型,我想停止(取消)重新启动。例如:

RestartSource
  .onFailuresWithBackoff(
    minBackoff = 1,
    maxBackoff = 5,
    randomFactor = 0.2,
    maxRestarts = 3
  ) { () => 
    val responseFuture = doSomeAsyncTask().recover {
      case SomeSpecialError =>
        // I want to quit from the restarts
      case NonFatal(ex) => 
        // Re-throw so that the Source is restarted
        throw ex
    }

    Source
      .future(responseFuture)
      .mapAsync(parallelism = 1)(Future.successful(_))
}

我尝试在包装的 Source 和 RestartSource 上设置监督策略,但事件从未到达它。因此,尝试在 Sink 操作符上执行此操作的解释相同。

scala akka-stream
2个回答
2
投票

RestartSource.onFailuresWithBackoff
文档中,您所需要的只是让源完成(不发出任何内容)以防止发生重新启动。

实现此目的的一种方法是,如果

doSomeAsyncTask
导致
Future[T]
,则将其映射到
Future[Option[T]]
,然后将显着的失败恢复为成功的
None
。然后在流源中:

  • 如果原来的 future 由于其他异常而失败,源将失败并重新启动
  • 如果最初的 future 因明显的异常而失败,我们会将其过滤掉,以便源完成而不发出任何内容
  • 如果原来的 future 成功了,我们会正常传递该值

例如:

RestartSource.onFailuresWithBackoff(
  // yada yada yada
) { () =>
  val baseFuture = doSomeAsyncTask().map(Option(_))
  val tweakedFuture = baseFuture.recoverWith {
    case SomeSpecialError => Future.successful(None)
    case NonFatal(e) => baseFuture  // including for clarity
  }

  Source.future(tweakedFuture)
    .mapConcat(_.toList)  // swallows the None arising from `SomeSpecialError`
    // the mapAsync in your question is pointless, so I've omitted it,
    // but if it's a placeholder for something else, you'd put it here
}

上面假设

doSomeAsyncTask()
永远不会导致成功的
null
,但由于
null
不应该通过 Akka Stream 传递,并且您没有处理它,这可能是一个相当安全的假设。


0
投票

我认为正确的处理方法是使用

RestartSettings
:

private val restartSettings = RestartSettings(
  minBackoff = 3.seconds,
  maxBackoff = 30.seconds,
  randomFactor = 0.2
)
  .withMaxRestarts(3, 1.hour)  // should always reach the 3 counts within 1 hour
  .withRestartOn {
    case sse: SomeSpecialError => 
      log error ("Didn't restart because of SomeSpecialError", sse)
      false  // don't restart
    case _ => 
      true  // restart
  }

RestartSource.onFailuresWithBackoff(restartSettings) { () =>
    Source
      .future(responseFuture)
      .mapAsync(parallelism = 1)(Future.successful(_))
}
© www.soinside.com 2019 - 2024. All rights reserved.