我的代码使用 Akka 流将二进制文件上传到 s3,如下所示:
source
.via(distributeChunks(MAX_BYTES_PER_CHUNK))
.throttle(maxRequestsPerSecond, 1.second, maximumBurst = 1, ThrottleMode.Shaping)
.runFoldAsync(1) { (partNumber, bytes) =>
{...}
我需要以这样的方式编写distributeChunks:如果字节大小小于
MAX_BYTES_PER_CHUNK
,它应该以MAX_BYTES_PER_CHUNK
或小于最后一个块的方式中断每个传入流。
我试过这个:
private def distributeChunks(maxChunkSize: Int): Flow[ByteString, ByteString, NotUsed] =
Flow[ByteString]
.statefulMapConcat { () =>
var buffer = ByteString.empty
{ bs: ByteString =>
buffer ++= bs
val chunks = new ArrayBuffer[ByteString]
while (buffer.length >= maxChunkSize) {
val (chunk, rest) = buffer.splitAt(maxChunkSize)
chunks += chunk
buffer = rest
}
chunks.toList
}
}
.mapMaterializedValue(_ => NotUsed)
这确保每个块大小等于 MAX_BYTES_PER_CHUNK 但它错过了最后一个块,我对如何解决这个问题有点困惑。有人可以帮助我更好地模拟这个并想出正确的代码来获得期望的结果吗?
这里有两个测试用例:
FILE SIZE: 10MB, MAX_PERMISSIBLE_CHUNK: 2MB should break into 5 chunks of 2MB.
FILE SIZE: 9MB, MAX_PERMISSIBLE_CHUNK: 2MB should break into 4 chunks of 2MB and 1 chunk of exactly 1MB.
经过大量调试后,我意识到我需要使用自定义入口和出口处理程序以确保正确累积字节:
def distributeChunks(maxChunkSize: Int): Flow[ByteString, ByteString, NotUsed] =
Flow.fromGraph(new GraphStage[FlowShape[ByteString, ByteString]] {
val in: Inlet[ByteString] = Inlet[ByteString]("distributeChunks.in")
val out: Outlet[ByteString] = Outlet[ByteString]("distributeChunks.out")
override val shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
var buffer: ByteString = ByteString.empty
setHandler(in, new InHandler {
override def onPush(): Unit = {
buffer ++= grab(in)
if (buffer.length >= maxChunkSize) {
val (chunk, rest) = buffer.splitAt(maxChunkSize)
buffer = rest
push(out, chunk)
} else {
pull(in)
}
}
override def onUpstreamFinish(): Unit = {
if (buffer.nonEmpty) {
emit(out, buffer)
}
completeStage()
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (!hasBeenPulled(in)) {
pull(in)
}
}
})
}
})
while 循环的问题是它无法控制从客户端注入的字节,这可能会导致问题。它可能取决于网络配置和许多其他因素,因此我使用自定义流来解决这个问题。
还刚刚注意到 Akka 现在处于 BSL 之下,这令人遗憾,希望旧功能可以按原样使用。 :)