我目前正在运行类似于以下内容的Akka流设置:
┌───────────────┐
┌─────────────┐ │┌─────────────┐│
│REST endpoint│──▶│Queue source ││
└─────────────┘ │└──────╷──────┘│
│┌──────▼──────┐│
││ Flow[T] ││
│└──────╷──────┘│
│┌──────▼──────┐│ ┌─────────────┐
││ KafkaSink │├─▶│ Kafka topic │
│└─────────────┘│ └─────────────┘
└───────────────┘
虽然此工作很好,但我想对生产系统有所了解,即是否有错误以及发生了哪些错误。例如,我将KafkaSink
包装到RestartSink.withBackoff
中,并将以下属性应用于包装的接收器:
private val decider: Supervision.Decider = {
case x =>
log.error("KafkaSink encountered an error and will stop", x)
Supervision.Stop
}
Flow[...]
.log("KafkaSink")
.to(Producer.plainSink(...))
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.addAttributes(
ActorAttributes.logLevels(
onElement = Logging.DebugLevel,
onFinish = Logging.WarningLevel,
onFailure = Logging.ErrorLevel
)
)
这的确为我提供了一些见解,例如我将收到一条日志消息,说明下游已关闭,以及通过添加的supervisionStrategy
发生的异常。
但是,此解决方案感觉有点像变通办法(例如,将异常记录为监督策略),并且也无法提供对RestartWithBackoffSink
行为的任何见解。我当然可以为该类启用DEBUG
级日志记录,但是再次,这感觉像是在生产环境中要解决的方法。
长话短说:
我当前正在运行类似于以下内容的Akka流设置:┌─────────────┐┐───────────┐│┌ ──────────────┐││REST端点│──▶│队列...
我想你快到了!