我正在尝试构建使用S3密钥的代码,然后从S3下载这些文件,然后将该数据保存到具有键名的磁盘上的文件(流程中进一步需要该文件),并且输出返回键/文件名。 。到目前为止,我所拥有的是;
val x: Sink[String, Future[IOResult]] =
Flow[String].flatMapConcat(key => S3.download("somebucket", key)).
withAttributes(S3Attributes.settings(useVersion1Api)).
collect{ case Some(x) => x._1 }.
flatMapConcat(identity).toMat(FileIO.toPath(Paths.get("???????")))(Keep.right)
我目前拥有的下载文件,但没有下载;-使用密钥名称作为文件名-返回文件名(它不应该是接收器,而是流)
我将不胜感激。我只是从alpakka和akka流开始的。我可能需要某种方式在元组中传递密钥,但是我似乎无法弄清楚以后如何使用元组的那一部分。
使用cchantep的第一个建议可能会解决;
val s3FileSaveFlow: Flow[String, (String, ObjectMetadata), NotUsed] =
Flow[String].flatMapConcat(key => S3.download("somebucket", key) collect{ case Some(src) => key -> src}).
flatMapConcat{ case (key,(src,meta)) => {
src.to(FileIO.toPath(Paths.get(key)))
Source.single((key,meta))
}}
val x: Sink[String, Future[IOResult]] =
Flow[String].flatMapConcat(key =>
S3.download("somebucket", key).collect {
case Some(src) => key -> SRC
})