如何以惯用的方式向我的Akka流添加错误日志记录?

问题描述 投票:0回答:1

我目前正在运行类似于以下内容的Akka流设置:

                 ┌───────────────┐                 
┌─────────────┐  │┌─────────────┐│                 
│REST endpoint│──▶│Queue source ││                 
└─────────────┘  │└──────╷──────┘│                 
                 │┌──────▼──────┐│                 
                 ││   Flow[T]   ││                 
                 │└──────╷──────┘│                 
                 │┌──────▼──────┐│  ┌─────────────┐
                 ││  KafkaSink  │├─▶│ Kafka topic │
                 │└─────────────┘│  └─────────────┘
                 └───────────────┘                 

虽然此工作很好,但我想对生产系统有所了解,即是否有错误以及发生了哪些错误。例如,我将KafkaSink包装到RestartSink.withBackoff中,并将以下属性应用于包装的接收器:

private val decider: Supervision.Decider = {
  case x =>
    log.error("KafkaSink encountered an error and will stop", x)
    Supervision.Stop
}

Flow[...]
  .log("KafkaSink")
  .to(Producer.plainSink(...))
  .withAttributes(ActorAttributes.supervisionStrategy(decider))
  .addAttributes(
    ActorAttributes.logLevels(
      onElement = Logging.DebugLevel,
      onFinish = Logging.WarningLevel,
      onFailure = Logging.ErrorLevel
    )
  )

这的确为我提供了一些见解,例如我将收到一条日志消息,说明下游已关闭,以及通过添加的supervisionStrategy发生的异常。

但是,此解决方案感觉有点像变通办法(例如,将异常记录为监督策略),并且也无法提供对RestartWithBackoffSink行为的任何见解。我当然可以为该类启用DEBUG级日志记录,但是再次,这感觉像是在生产环境中要解决的方法。

长话短说:

  • 我尝试了解Akka流中发生的错误的方式是否存在明显的缺陷
  • 是否有更好/惯用的方式将日志记录添加到生产中的Akka流中?>

我当前正在运行类似于以下内容的Akka流设置:┌─────────────┐┐───────────┐│┌ ──────────────┐││REST端点│──▶│队列...

scala logging akka akka-stream
1个回答
0
投票

我想你快到了!

© www.soinside.com 2019 - 2024. All rights reserved.