从akka.stream.scaladsl.Source读取第一个字节

问题描述 投票:0回答:1

我正在尝试从akka.stream.scaladsl.Source[ByteString, Any]读取前16个字节并返回[Array[Byte], Source[ByteString, Any]]

读取前16个字节后,我想照常流式传输其余的Source

用例:

Source[ByteString, Any]是加密流,流中的前16个字节是初始化向量。我需要获取初始化向量才能解密流的其余部分。

这是我尝试过的:

Source.single(ByteString("This is my test string"))
      .prefixAndTail(16).runWith(Sink.head)

我想要这样的东西,但是prefixAndTail将元素数量作为输入。 元素数不是字节数。

[如果有任何建议,请告诉我。谢谢!

scala akka akka-stream reactive-streams
1个回答
0
投票

以下示例对您的用例进行了一些假设:

  • ByteString中的第一个Source元素始终包含16字节的初始化矢量(在此将其称为“键”)。第一个元素中的其余字节(即前16个字节以外的字节)可以用密钥解密。 (为简单起见,此示例将前三个字节视为键。)
  • 解密后的值为String
val b1 = ByteString.fromString("abcdef")
val b2 = ByteString.fromString("ghijkl")
val b3 = ByteString.fromString("mnopqr")
val b4 = ByteString.fromString("stuvwx")

val byteStringSource = Source(Vector(b1, b2, b3, b4))

// The first value in the tuple argument is the ByteString key, the second is
// the encrypted ByteString. Returns the original encrypted ByteString and the
// decrypted String as a Some (or None if the decryption fails).
def decrypt(keyAndEncrypted: (ByteString, ByteString)): (ByteString, Option[String]) = {
  // do fancy decryption stuff with the key
  (keyAndEncrypted._2, Option(keyAndEncrypted._2.utf8String.toUpperCase))
}

val decryptionFlow = Flow.fromFunction(decrypt)

val decryptedSource: Source[(ByteString, Option[String]), NotUsed] =
  byteStringSource
    .prefixAndTail(1)
    .map {
      case (prefix, tail) =>
        val (key, rest) = prefix.head.splitAt(3) // using head instead of headOption for simplicity
        (key, Source(Vector(rest)).concat(tail))
    }
    .collect { case (key, bSource) => bSource.map(b => (key, b)) }
    .flatMapConcat(identity)
    .via(decryptionFlow)

decryptedSource.runForeach {
  case (encrypted, decrypted) =>
    println((encrypted.utf8String, decrypted))
}

运行以上命令会显示以下内容:

(def,Some(DEF))
(ghijkl,Some(GHIJKL))
(mnopqr,Some(MNOPQR))
(stuvwx,Some(STUVWX))

在此示例中,我将ByteString中第一个Source的前三个字节用作键。该初始ByteString中的其余三个字节以Source的其余部分(尾部)为前缀,然后对所得的Source进行转换,以使键与每个ByteString元素耦合。然后将Source展平并通过Flow解密。 Flow返回原始加密的ByteString和包含解密值的Option[String]

希望这至少会为您的使用案例提供一些启发和想法。

© www.soinside.com 2019 - 2024. All rights reserved.