[处理一系列对象时的链式Scala期货?

问题描述 投票:-1回答:1
import scala.concurrent.duration.Duration
import scala.concurrent.duration.Duration._
import scala.concurrent.{Await, Future}
import scala.concurrent.Future._
import scala.concurrent.ExecutionContext.Implicits.global

object TestClass {

  final case class Record(id: String)

  final case class RecordDetail(id: String)

  final case class UploadResult(result: String)

  val ids: Seq[String] = Seq("a", "b", "c", "d")

  def fetch(id: String): Future[Option[Record]] = Future {
    Thread sleep 100
    if (id != "b" && id != "d") {
      Some(Record(id))
    } else None
  }

  def fetchRecordDetail(record: Record): Future[RecordDetail] = Future {
    Thread sleep 100
    RecordDetail(record.id + "_detail")
  }

  def upload(recordDetail: RecordDetail): Future[UploadResult] = Future {
    Thread sleep 100
    UploadResult(recordDetail.id + "_uploaded")
  }

  def notifyUploaded(results: Seq[UploadResult]): Unit = println("notified " + results)

  def main(args: Array[String]): Unit = {

    //for each id from ids, call fetch method and if record exists call fetchRecordDetail 
    //and after that upload RecordDetail, collect all UploadResults into seq
    //and call notifyUploaded with that seq and await result, you should see "notified ...." in console


    // In the following line of code how do I pass result of fetch to fetchRecordDetail function
    val result = Future.traverse(ids)(x => Future(fetch(x)))
    // val result: Future[Unit] = ???

    Await.ready(result, Duration.Inf)
  }

}

我的问题是,我不知道要在main中放入什么代码才能使其按注释中的代码工作。总而言之,我有一个ids:Seq[String],我希望每个id都通过异步方法fetchfetchRecordDetailupload,最后是整个Seq进入notifyUploaded

scala sequence traversal
1个回答
2
投票

我认为最简单的方法是:

  def main(args: Array[String]): Unit = {

    //for each id from ids, call fetch method and if record exists call fetchRecordDetail
    //and after that upload RecordDetail, collect all UploadResults into seq
    //and call notifyUploaded with that seq and await result, you should see "notified ...." in console

    def runWithOption[A, B](f: A => Future[B], oa: Option[A]): Future[Option[B]] = oa match {
      case Some(a) => f(a).map(b => Some(b))
      case None => Future.successful(None)
    }

    val ids: Seq[String] = Seq("a", "b", "c", "d")

    val resultSeq: Seq[Future[Option[UploadResult]]] = ids.map(id => {
      for (or: Option[Record] <- fetch(id);
           ord: Option[RecordDetail] <- runWithOption(fetchRecordDetail, or);
           our: Option[UploadResult] <- runWithOption(upload, ord)
      ) yield our
    })

    val filteredResult: Future[Seq[UploadResult]] = Future.sequence(resultSeq).map(s => s.collect({ case Some(ur) => ur }))
    val result: Future[Seq[UploadResult]] = filteredResult.andThen({ case Success(s) => notifyUploaded(s) })

    Await.ready(result, Duration.Inf)
  }

这个想法是,您首先通过所有方法都获得了一个Seq[Future[_]],即您map(这里是通过理解来完成的)。这里有一个重要的技巧是实际通过Seq[Future[Option[_]]]。通过Option[_] helper方法在整个链中传递runWithOption可以极大地简化代码,而无需在最后阶段进行阻塞。

然后将Seq[Future[_]]转换为Future[Seq[_]],并过滤掉在ids阶段失败的那些fetch的结果。最后您申请notifyUploaded

P.S。请注意,此代码中没有任何错误处理,也不清楚在不同阶段出现错误时您的期望行为。

© www.soinside.com 2019 - 2024. All rights reserved.