基于URL或本地文件限制流

问题描述 投票:0回答:1

我有一个

akka http
API,用户可以将 S3 URL 发送到服务器。然后,服务器从 AWS 服务器启动流并对源执行后续操作。不过,我想在执行任何操作之前验证传入的
stream
的大小。但我们不能使用
withSizeLimit
akka-http
指令,因此我为此创建了一个自定义实现。

final case class SizeLimit(maxBytes: Long, contentLength: Option[Long] = None) extends Attributes.Attribute {
  def isDisabled = maxBytes < 0
}

object Limitable {
  def applyForByteStrings[Mat](source: Source[ByteString, Mat], limit: SizeLimit): Source[ByteString, Mat] =
    applyLimit(source, limit)(_.size)

  def applyForChunks[Mat](source: Source[ChunkStreamPart, Mat], limit: SizeLimit): Source[ChunkStreamPart, Mat] =
    applyLimit(source, limit)(_.data.size)

  def applyLimit[T, Mat](source: Source[T, Mat], limit: SizeLimit)(sizeOf: T => Int): Source[T, Mat] =
    if (limit.isDisabled) source withAttributes Attributes(limit) // no need to add stage, it's either there or not needed
    else source.via(new Limitable(sizeOf)) withAttributes Attributes(limit)

  private val limitableDefaults = Attributes.name("limitable")
}

final class Limitable[T](sizeOf: T => Int) extends GraphStage[FlowShape[T, T]] {
  val in = Inlet[T]("Limitable.in")
  val out = Outlet[T]("Limitable.out")
  var numPullCalls = 0
  var numPushCalls = 0
  override val shape = FlowShape.of(in, out)
  override protected val initialAttributes: Attributes = Limitable.limitableDefaults

  override def createLogic(_attributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
    private var maxBytes = -1L
    private var bytesLeft = Long.MaxValue

    @nowarn("msg=deprecated") // we need getFirst semantics
    override def preStart(): Unit = {
      _attributes.getFirst[SizeLimit] match {
        case Some(limit: SizeLimit) if limit.isDisabled =>
        // "no limit"
        case Some(SizeLimit(bytes, cl @ Some(contentLength))) =>
          if (contentLength > bytes) failStage(EntityStreamSizeException(bytes, cl))
        // else we still count but never throw an error
        case Some(SizeLimit(bytes, None)) =>
          maxBytes = bytes
          bytesLeft = bytes
        case None =>
      }
    }

    override def onPush(): Unit = {
      numPushCalls += 1
      println(s"Push calls $numPushCalls")
      val elem = grab(in)
      val temp = sizeOf(elem)
      println(s"Elem size is $temp")
      bytesLeft -= temp
      if (bytesLeft >= 0) {
        push(out, elem)
      }
      else {
        println(s"EntityStreamSizeException Bytes left $bytesLeft")
        failStage(EntityStreamSizeException(maxBytes))
      }
    }

    override def onPull(): Unit = {
      numPullCalls += 1
      println(s"Pull calls $numPullCalls")
      pull(in)
    }

    setHandlers(in, out, this)
  }
}


val filePath = Paths.get("/Users/<username>/Documents/bigfile.pdf")
val fileSource: Source[ByteString, Any] = FileIO.fromPath(filePath)
val res = Limitable.applyForByteStrings(fileSource, SizeLimit(4000000L))
val sink   = StreamConverters.asInputStream()
val result = res.runWith(sink)
val tis = TikaInputStream.get(result)

这是自定义实现参考:https://github.com/akka/akka-http/blob/main/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala

自定义实现后,如果文件大小超过 4 Mb,我希望服务器抛出

EntityStreamSizeException
,但它不会抛出任何异常。 我在这里缺少什么?

scala akka akka-stream akka-http
1个回答
0
投票

文件说

Demand flowing upstream leading to elements flowing downstream.

我没有在任何地方消耗输出流,因此仅限于处理 16 个元素,这是

buffer
的默认
Sink
大小。 消耗输出流的一种方法是

val fileSource: Source[ByteString, Any] = FileIO.fromPath(filePath)
val res = Limitable.applyForByteStrings(fileSource, SizeLimit(4000000L))
res.runForeach(println) //This makes sink utilise the stream and further pull from GraphStage
© www.soinside.com 2019 - 2024. All rights reserved.