有没有办法使Source可重用?
我有一个akka-http服务器接收大文件上传,然后通过HTTP POST将(chunked)数据流式传输到订户websockets和其他HTTP服务器。在这两种情况下,都有一个接受Source [ByteString,Any]的API:
使用这些API比采用单个ByteString的版本具有一些优势(只需要执行单个HTTP帖子,可以重新创建相同的分块消息等)。
那么有没有办法让这样的工作(没有缓冲内存中的一切)?
val allSinks: Seq[Sink[Source[ByteString, Any], Future[Done]]] = ???
val g = RunnableGraph.fromGraph(GraphDSL.create(allSinks) { implicit builder => sinks =>
import GraphDSL.Implicits._
// Broadcast with an output for each subscriber
val broadcast = builder.add(Broadcast[DataSource](sinks.size))
Source.single(source) ~> broadcast
sinks.foreach(broadcast ~> _)
ClosedShape
})
来源不可重复使用
不幸的是,Source
在耗尽后无法重复使用。可以重新使用数据的基础“源”来创建单独的Source
值,但每个值最多只能在一个流上运行。
坚持
如果需要重放功能,那么流式传输的数据将需要存储在持久性机制中,以便以后重放。这个机制可以是文件系统,数据库,Kafka,......
下面是使用文件系统的模型。
传入的POST
消息正文可以在写入模式下流式传输到文件:
post {
path(Segment) { fileName =>
extractRequestEntity { entity =>
complete {
entity
.dataBytes
.toMat(FileIO.toPath(Paths.get(fileName), Set(CREATE_NEW, WRITE)))(Keep.Right)
.run()
.andThen {
case Success(ioResult) =>
StatusCodes.Ok -> s"wrote ${ioResult.count} bytes"
case Failure(ex) =>
StatusCodes.InternalServerError -> ex.toString
}
}
}
}
}
}
那么就没有必要创建一个Broadcast
集线器,只需使用该文件的内容响应GET
请求:
path(Segment) { fileName =>
getFromFile(fileName)
}
这利用了这样一个事实,即大多数操作系统将允许您作为字节流写入文件,同时从文件读取字节流...