当使用akka流从S3消耗文件时出现超时异常。

问题描述 投票:0回答:1

我想用akka流的方式从S3中消耗一堆文件。

S3.listBucket("<bucket>", Some("<common_prefix>"))
  .flatMapConcat { r => S3.download("<bucket>", r.key) }
  .mapConcat(_.toList)
  .flatMapConcat(_._1)
  .via(Compression.gunzip())
  .via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
  .map(_.utf8String)
  .runForeach { x => println(x) }

在不增加... akka.http.host-connection-pool.response-entity-subscription-timeout 我得到

java.util.concurrent.TimeoutException: Response entity was not subscribed after 1 second. Make sure to read the response entity body or call discardBytes() on it. 对于 第二 文件,就在打印完第一个文件的最后一行后,当试图访问第二个文件的第一行时,出现了异常。

我理解这个异常的性质。我不明白为什么对第二个文件的请求已经在进行中,而第一个文件还在处理中。我想,这里面涉及到一些缓冲。

有什么办法可以消除这个异常吗? 不得不增加 akka.http.host-connection-pool.response-entity-subscription-timeout?

scala akka akka-stream akka-http alpakka
1个回答
0
投票

而不是将下载文件的处理合并到一个流中,并以 flatMapConcat 你可以尝试在外部流中实现流的物质化,在向下游发射输出之前,在那里完全处理它。 然后你不应该开始下载(并完全处理)下一个对象,直到你准备好。

一般来说,你希望避免有太多的流实体化以减少开销,但我怀疑这对于像这样执行网络IO的应用来说是可以忽略不计的。

如果这样的东西能用,请告诉我。(警告: 未测试)

S3.listBucket("<bucket>", Some("<common_prefix>"))
  .mapAsync(1) { result =>
    val contents = S3.download("<bucket>", r.key)
      .via(Compression.gunzip())
      .via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
      .map(_.utf8String)
      .to(Sink.seq)(Keep.right)
      .run()
    contents     
  }
  .mapConcat(identity)
  .runForeach { x => println(x) }
© www.soinside.com 2019 - 2024. All rights reserved.