如果出现异常,我正在使用 RestartSource.onFailuresWithBackoff 重新启动源,但如果收到某种异常类型,我想停止(取消)重新启动。例如:
RestartSource
.onFailuresWithBackoff(
minBackoff = 1,
maxBackoff = 5,
randomFactor = 0.2,
maxRestarts = 3
) { () =>
val responseFuture = doSomeAsyncTask().recover {
case SomeSpecialError =>
// I want to quit from the restarts
case NonFatal(ex) =>
// Re-throw so that the Source is restarted
throw ex
}
Source
.future(responseFuture)
.mapAsync(parallelism = 1)(Future.successful(_))
}
我尝试在包装的 Source 和 RestartSource 上设置监督策略,但事件从未到达它。因此,尝试在 Sink 操作符上执行此操作的解释相同。
从
RestartSource.onFailuresWithBackoff
文档中,您所需要的只是让源完成(不发出任何内容)以防止发生重新启动。
实现此目的的一种方法是,如果
doSomeAsyncTask
导致 Future[T]
,则将其映射到 Future[Option[T]]
,然后将显着的失败恢复为成功的 None
。然后在流源中:
例如:
RestartSource.onFailuresWithBackoff(
// yada yada yada
) { () =>
val baseFuture = doSomeAsyncTask().map(Option(_))
val tweakedFuture = baseFuture.recoverWith {
case SomeSpecialError => Future.successful(None)
case NonFatal(e) => baseFuture // including for clarity
}
Source.future(tweakedFuture)
.mapConcat(_.toList) // swallows the None arising from `SomeSpecialError`
// the mapAsync in your question is pointless, so I've omitted it,
// but if it's a placeholder for something else, you'd put it here
}
上面假设
doSomeAsyncTask()
永远不会导致成功的 null
,但由于 null
不应该通过 Akka Stream 传递,并且您没有处理它,这可能是一个相当安全的假设。
我认为正确的处理方法是使用
RestartSettings
:
private val restartSettings = RestartSettings(
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2
)
.withMaxRestarts(3, 1.hour) // should always reach the 3 counts within 1 hour
.withRestartOn {
case sse: SomeSpecialError =>
log error ("Didn't restart because of SomeSpecialError", sse)
false // don't restart
case _ =>
true // restart
}
RestartSource.onFailuresWithBackoff(restartSettings) { () =>
Source
.future(responseFuture)
.mapAsync(parallelism = 1)(Future.successful(_))
}