我想用akka流的方式从S3中消耗一堆文件。
S3.listBucket("<bucket>", Some("<common_prefix>"))
.flatMapConcat { r => S3.download("<bucket>", r.key) }
.mapConcat(_.toList)
.flatMapConcat(_._1)
.via(Compression.gunzip())
.via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
.map(_.utf8String)
.runForeach { x => println(x) }
在不增加... akka.http.host-connection-pool.response-entity-subscription-timeout
我得到
java.util.concurrent.TimeoutException: Response entity was not subscribed after 1 second. Make sure to read the response entity body or call discardBytes() on it.
对于 第二 文件,就在打印完第一个文件的最后一行后,当试图访问第二个文件的第一行时,出现了异常。
我理解这个异常的性质。我不明白为什么对第二个文件的请求已经在进行中,而第一个文件还在处理中。我想,这里面涉及到一些缓冲。
有什么办法可以消除这个异常吗?橆 不得不增加 akka.http.host-connection-pool.response-entity-subscription-timeout
?
而不是将下载文件的处理合并到一个流中,并以 flatMapConcat
你可以尝试在外部流中实现流的物质化,在向下游发射输出之前,在那里完全处理它。 然后你不应该开始下载(并完全处理)下一个对象,直到你准备好。
一般来说,你希望避免有太多的流实体化以减少开销,但我怀疑这对于像这样执行网络IO的应用来说是可以忽略不计的。
如果这样的东西能用,请告诉我。(警告: 未测试)
S3.listBucket("<bucket>", Some("<common_prefix>"))
.mapAsync(1) { result =>
val contents = S3.download("<bucket>", r.key)
.via(Compression.gunzip())
.via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
.map(_.utf8String)
.to(Sink.seq)(Keep.right)
.run()
contents
}
.mapConcat(identity)
.runForeach { x => println(x) }