我有一个协议,类型为protocol: Flow[ByteString, ByteString, NotUsed]
。流中的元素是用户发送的消息,流中的元素是服务器的响应。加密层的类型为encryption: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed]
(Akka流以这种格式提供例如TLS
)。通过这种架构,通过加密层管道传输协议可以归结为:
TLS
问题是我希望能够基于从客户端收到的第一个protocol.join(tlsEncryptionLayer): Flow[ByteString, ByteString, NotUsed]
protocol.join(noEncryptionLayer): Flow[ByteString, ByteString, NotUsed]
来选择加密层(又名BidiFlow
为join
-ed]]
此问题看起来与ByteString
的def lazyInit[I, O, M](flowFactory: (I) ⇒ Future[Flow[I, O, M]], fallback: () ⇒ M): Flow[I, O, M]
非常相似。例如,def lazyInit[I, O, M](flowFactory: (I) ⇒ Future[Flow[I, O, M]], fallback: () ⇒ M): Flow[I, O, M]
上的Flow
会让我不粘:
lazyInit
考虑这样的假设功能,当第一个字节为BidiFlow
时,我将能够编写使用TLS的代码,否则将不加密。假设上面的功能,下面的代码类型检查:
object BidiFlow {
// This method doesn't exist, but if it did, my problem would be solved
def lazyInit[I1, O1, I2, O2](
bidiFactory: (I1) ⇒ BidiFlow[I1, O1, I2, O2, NotUsed]
): BidiFlow[I1, O1, I2, O2, NotUsed] = ???
}
[发布并查看了更多文档之后,我想出了一个半解决方案:由于0x16
最终还是要由val encryptionLayer = BidiFlow.lazyInit[ByteString, ByteString, ByteString, ByteString](
bidiFactory = {
// If the very first byte is `0x16`, we are dealing with TLS
case bstr if bstr.head == 0x16 =>
TLS(createSSLEngine = ???, closing = ???)
.reversed
.atop(BidiFlow.fromFunctions(
(inbound: TLSProtocol.SslTlsInbound) => inbound match {
case TLSProtocol.SessionBytes(_, bytes) => bytes
case _ => ??? // TODO: good error handling
},
(byteString: ByteString) => TLSProtocol.SendBytes(byteString)
))
// If the first byte is anything else, assume no-encryption
case _ => BidiFlow.identity[ByteString, ByteString]
}
)
-ed到BidiFlow
,因此我们可以想到问题所在用join
表示。
Flow
尽管这仍然很糟糕-加密层不再像以前那样整齐地塞在Flow
中。相反,我需要重组周围的代码以适应加密层。 :(