任何重用Source [ByteString,Any]的方法(不将其保留在内存中)

问题描述 投票:1回答:1

有没有办法使Source可重用?

我有一个akka-http服务器接收大文件上传,然后通过HTTP POST将(chunked)数据流式传输到订户websockets和其他HTTP服务器。在这两种情况下,都有一个接受Source [ByteString,Any]的API:

  • 在HTTP POST的情况下HttpEntity(...,source)
  • websocket的BinaryMessage(源代码)

使用这些API比采用单个ByteString的版本具有一些优势(只需要执行单个HTTP帖子,可以重新创建相同的分块消息等)。

那么有没有办法让这样的工作(没有缓冲内存中的一切)?

val allSinks: Seq[Sink[Source[ByteString, Any], Future[Done]]] = ???

val g = RunnableGraph.fromGraph(GraphDSL.create(allSinks) { implicit builder => sinks =>
  import GraphDSL.Implicits._

  // Broadcast with an output for each subscriber
  val broadcast = builder.add(Broadcast[DataSource](sinks.size))
  Source.single(source) ~> broadcast
  sinks.foreach(broadcast ~> _)
  ClosedShape
})
akka-stream akka-http
1个回答
0
投票

来源不可重复使用

不幸的是,Source在耗尽后无法重复使用。可以重新使用数据的基础“源”来创建单独的Source值,但每个值最多只能在一个流上运行。

坚持

如果需要重放功能,那么流式传输的数据将需要存储在持久性机制中,以便以后重放。这个机制可以是文件系统,数据库,Kafka,......

下面是使用文件系统的模型。

传入的POST消息正文可以在写入模式下流式传输到文件:

post {
  path(Segment) { fileName =>
    extractRequestEntity { entity =>
      complete {
        entity
          .dataBytes
          .toMat(FileIO.toPath(Paths.get(fileName), Set(CREATE_NEW, WRITE)))(Keep.Right)
          .run()
          .andThen {
            case Success(ioResult) =>
              StatusCodes.Ok -> s"wrote ${ioResult.count} bytes"
            case Failure(ex) =>
              StatusCodes.InternalServerError -> ex.toString
          } 
        }
      }
    }
  }
}

那么就没有必要创建一个Broadcast集线器,只需使用该文件的内容响应GET请求:

path(Segment) { fileName =>
  getFromFile(fileName)
}

这利用了这样一个事实,即大多数操作系统将允许您作为字节流写入文件,同时从文件读取字节流...

© www.soinside.com 2019 - 2024. All rights reserved.