我正在尝试从akka.stream.scaladsl.Source[ByteString, Any]
读取前16个字节并返回[Array[Byte], Source[ByteString, Any]]
。
读取前16个字节后,我想照常流式传输其余的Source
。
用例:
Source[ByteString, Any]
是加密流,流中的前16个字节是初始化向量。我需要获取初始化向量才能解密流的其余部分。
这是我尝试过的:
Source.single(ByteString("This is my test string"))
.prefixAndTail(16).runWith(Sink.head)
我想要这样的东西,但是prefixAndTail
将元素数量作为输入。 元素数不是字节数。
[如果有任何建议,请告诉我。谢谢!
以下示例对您的用例进行了一些假设:
ByteString
中的第一个Source
元素始终包含16字节的初始化矢量(在此将其称为“键”)。第一个元素中的其余字节(即前16个字节以外的字节)可以用密钥解密。 (为简单起见,此示例将前三个字节视为键。)String
。val b1 = ByteString.fromString("abcdef")
val b2 = ByteString.fromString("ghijkl")
val b3 = ByteString.fromString("mnopqr")
val b4 = ByteString.fromString("stuvwx")
val byteStringSource = Source(Vector(b1, b2, b3, b4))
// The first value in the tuple argument is the ByteString key, the second is
// the encrypted ByteString. Returns the original encrypted ByteString and the
// decrypted String as a Some (or None if the decryption fails).
def decrypt(keyAndEncrypted: (ByteString, ByteString)): (ByteString, Option[String]) = {
// do fancy decryption stuff with the key
(keyAndEncrypted._2, Option(keyAndEncrypted._2.utf8String.toUpperCase))
}
val decryptionFlow = Flow.fromFunction(decrypt)
val decryptedSource: Source[(ByteString, Option[String]), NotUsed] =
byteStringSource
.prefixAndTail(1)
.map {
case (prefix, tail) =>
val (key, rest) = prefix.head.splitAt(3) // using head instead of headOption for simplicity
(key, Source(Vector(rest)).concat(tail))
}
.collect { case (key, bSource) => bSource.map(b => (key, b)) }
.flatMapConcat(identity)
.via(decryptionFlow)
decryptedSource.runForeach {
case (encrypted, decrypted) =>
println((encrypted.utf8String, decrypted))
}
运行以上命令会显示以下内容:
(def,Some(DEF))
(ghijkl,Some(GHIJKL))
(mnopqr,Some(MNOPQR))
(stuvwx,Some(STUVWX))
在此示例中,我将ByteString
中第一个Source
的前三个字节用作键。该初始ByteString
中的其余三个字节以Source
的其余部分(尾部)为前缀,然后对所得的Source
进行转换,以使键与每个ByteString
元素耦合。然后将Source
展平并通过Flow
解密。 Flow
返回原始加密的ByteString
和包含解密值的Option[String]
。
希望这至少会为您的使用案例提供一些启发和想法。