基于第一个输入元素选择一个BidiFlow

问题描述 投票:0回答:1

我有一个协议,类型为protocol: Flow[ByteString, ByteString, NotUsed]。流中的元素是用户发送的消息,流中的元素是服务器的响应。加密层的类型为encryption: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed](Akka流以这种格式提供例如TLS)。通过这种架构,通过加密层管道传输协议可以归结为:

TLS

问题是我希望能够基于从客户端收到的第一个protocol.join(tlsEncryptionLayer): Flow[ByteString, ByteString, NotUsed] protocol.join(noEncryptionLayer): Flow[ByteString, ByteString, NotUsed] 来选择加密层(又名BidiFlowjoin -ed]]

到目前为止我的想法

此问题看起来与ByteStringdef lazyInit[I, O, M](flowFactory: (I) ⇒ Future[Flow[I, O, M]], fallback: () ⇒ M): Flow[I, O, M]非常相似。例如,def lazyInit[I, O, M](flowFactory: (I) ⇒ Future[Flow[I, O, M]], fallback: () ⇒ M): Flow[I, O, M]上的Flow会让我不粘:

lazyInit

考虑这样的假设功能,当第一个字节为BidiFlow时,我将能够编写使用TLS的代码,否则将不加密。假设上面的功能,下面的代码类型检查:

object BidiFlow {

  // This method doesn't exist, but if it did, my problem would be solved
  def lazyInit[I1, O1, I2, O2](
    bidiFactory: (I1) ⇒ BidiFlow[I1, O1, I2, O2, NotUsed]
  ): BidiFlow[I1, O1, I2, O2, NotUsed] = ???
}
scala akka akka-stream
1个回答
0
投票

[发布并查看了更多文档之后,我想出了一个半解决方案:由于0x16最终还是要由val encryptionLayer = BidiFlow.lazyInit[ByteString, ByteString, ByteString, ByteString]( bidiFactory = { // If the very first byte is `0x16`, we are dealing with TLS case bstr if bstr.head == 0x16 => TLS(createSSLEngine = ???, closing = ???) .reversed .atop(BidiFlow.fromFunctions( (inbound: TLSProtocol.SslTlsInbound) => inbound match { case TLSProtocol.SessionBytes(_, bytes) => bytes case _ => ??? // TODO: good error handling }, (byteString: ByteString) => TLSProtocol.SendBytes(byteString) )) // If the first byte is anything else, assume no-encryption case _ => BidiFlow.identity[ByteString, ByteString] } ) -ed到BidiFlow,因此我们可以想到问题所在用join表示。

Flow

尽管这仍然很糟糕-加密层不再像以前那样整齐地塞在Flow中。相反,我需要重组周围的代码以适应加密层。 :(

© www.soinside.com 2019 - 2024. All rights reserved.