根据发送的消息恢复Akka的要求

问题描述 投票:2回答:1

我正在通过ask向演员发送不同的消息。超时时,我想提供一个默认值,该默认值对于发送给参与者的消息是不同的。

由于超时异常总是相同的,所以我不能在recover中使用它来返回不同的默认值,我需要发送原始消息。

如何做到这一点。

代码示例:

      val storageActorProxy = Flow[ByteString]
        .via(Framing.lengthField(TCPMessage.sizeFieldLength, TCPMessage.sizeFieldIndex, Int.MaxValue))
        .map(TCPMessage.decode)
        .ask[OperationResponse](storageActor)
        //TODO: looking for this recover; non-existent AFAIK
        .customRecover { 
            case Op1 => DefaultResponseA()
            case Op2 => DefaultResponseB()
        }
        .map(TCPMessage.encode(_).toByteString)

scala akka future actor akka-stream
1个回答
0
投票

Akka的ask方法实际上很容易重新创建-它只是mapAsync,带有一些额外的逻辑,可以在演员死后(see the code)带来更好的错误。因此,只需手动使用mapAsync,即可恢复询问错误。

val storageActorProxy = Flow[ByteString]
  .via(Framing.lengthField(TCPMessage.sizeFieldLength, TCPMessage.sizeFieldIndex, Int.MaxValue))
  .map(TCPMessage.decode)
  .mapAsync { decodedMessage =>
     (storageActor ? decodedMessage).recover {
       case Op1 => DefaultResponseA()
       case Op2 => DefaultResponseB()
     }
  }
  .map(TCPMessage.encode(_).toByteString)
© www.soinside.com 2019 - 2024. All rights reserved.