我正在通过ask
向演员发送不同的消息。超时时,我想提供一个默认值,该默认值对于发送给参与者的消息是不同的。
由于超时异常总是相同的,所以我不能在recover
中使用它来返回不同的默认值,我需要发送原始消息。
如何做到这一点。
代码示例:
val storageActorProxy = Flow[ByteString]
.via(Framing.lengthField(TCPMessage.sizeFieldLength, TCPMessage.sizeFieldIndex, Int.MaxValue))
.map(TCPMessage.decode)
.ask[OperationResponse](storageActor)
//TODO: looking for this recover; non-existent AFAIK
.customRecover {
case Op1 => DefaultResponseA()
case Op2 => DefaultResponseB()
}
.map(TCPMessage.encode(_).toByteString)
Akka的ask
方法实际上很容易重新创建-它只是mapAsync
,带有一些额外的逻辑,可以在演员死后(see the code)带来更好的错误。因此,只需手动使用mapAsync
,即可恢复询问错误。
val storageActorProxy = Flow[ByteString]
.via(Framing.lengthField(TCPMessage.sizeFieldLength, TCPMessage.sizeFieldIndex, Int.MaxValue))
.map(TCPMessage.decode)
.mapAsync { decodedMessage =>
(storageActor ? decodedMessage).recover {
case Op1 => DefaultResponseA()
case Op2 => DefaultResponseB()
}
}
.map(TCPMessage.encode(_).toByteString)