[Alpakka S3从存储桶下载文件,保存到文件,并且文件名可用于流程的下一部分

问题描述 投票:0回答:1

我正在尝试构建使用S3密钥的代码,然后从S3下载这些文件,然后将该数据保存到具有键名的磁盘上的文件(流程中进一步需要该文件),并且输出返回键/文件名。 。到目前为止,我所拥有的是;

    val x: Sink[String, Future[IOResult]] =
      Flow[String].flatMapConcat(key => S3.download("somebucket", key)).
        withAttributes(S3Attributes.settings(useVersion1Api)).
        collect{ case Some(x) => x._1 }.
        flatMapConcat(identity).toMat(FileIO.toPath(Paths.get("???????")))(Keep.right)

我目前拥有的下载文件,但没有下载;-使用密钥名称作为文件名-返回文件名(它不应该是接收器,而是流)

我将不胜感激。我只是从alpakka和akka流开始的。我可能需要某种方式在元组中传递密钥,但是我似乎无法弄清楚以后如何使用元组的那一部分。

使用cchantep的第一个建议可能会解决;

    val s3FileSaveFlow: Flow[String, (String, ObjectMetadata), NotUsed] =
      Flow[String].flatMapConcat(key => S3.download("somebucket", key) collect{ case Some(src) => key -> src}).
        flatMapConcat{ case (key,(src,meta)) => {
          src.to(FileIO.toPath(Paths.get(key)))
          Source.single((key,meta))
        }}
scala amazon-s3 akka-stream alpakka
1个回答
1
投票
val x: Sink[String, Future[IOResult]] =
  Flow[String].flatMapConcat(key => 
    S3.download("somebucket", key).collect {
      case Some(src) => key -> SRC
    })
© www.soinside.com 2019 - 2024. All rights reserved.