使用期货在scala中展开不带参数的函数

问题描述 投票:1回答:2

我正在尝试创建一个转换链来定义给定功能的可能转换:

type Transformation[T] = T => Future[T]

def transformationChain[T](chain: Seq[Transformation[T]]): Transformation[T] = {
}


val t1: Transformation[Int] = t => Future.successful(t + t)
val t2: Transformation[Int] = _ => Future.failed(new NoSuchElementException)
val t3: Transformation[Int] = t =>
    if (t > 2) Future.successful(t * t)
    else Future.failed(new NoSuchElementException)

val tc = transformationChain(Seq(t1, t2, t2, t3))
val tc2 = transformationChain(Seq(t2, t2, t2))
val tc3 = transformationChain(Seq(t2, t3, t1))

println(Await.result(tc(2), 5.seconds))  // 16
println(Await.result(tc2(2), 5.seconds)) // throw NoSuchElementException
println(Await.result(tc3(2), 5.seconds)) // 4

问题是我不了解如何在“ transformationChain”方法中解包这些函数,以通过在循环中或递归调用它们将结果发送到链中的每个下一个函数。

scala functional-programming concurrent.futures
2个回答
1
投票

您所说的Transformation(因此A => F [B]的功能)通常被称为Kleisli箭头。

Cats库具有数据类型,这使得对这些类型的函数的操作更加容易。例如,它具有方法andThen,该方法允许组合以下功能:

import cats.data.Kleisli
import cats.implicits._

val t1: Transformation[Int] = t => Future.successful(t + t)
val t2: Transformation[Int] = _ => Future.failed(new NoSuchElementException)

Kleisli(t1).andThen(Kleisli(t2))

唯一的问题是,您的一个转换可能会返回失败的未来,这会缩短整个链条。我们可以使用recoverWith

进行修复。

所以最终transformationChain可能看起来像:

def transformationChain[T](chain: Seq[Transformation[T]]): Transformation[T] =
    t =>
      chain
         //wrap the function in Kleisli and then use replace failed futures with succeeded
         //future, that are passing value over
        .map(Kleisli(_).recoverWith {
          case _ => Kleisli(x => Future.successful(x))
        })
        .reduce(_ andThen _) //combine all elements with andThen
        .apply(t)

对于情况1和3正常工作,但对于情况2失败,因为它将仅返回传递的值。

println(Await.result(tc(2), 5.seconds)) // 16
println(Await.result(tc3(2), 5.seconds)) // 4
println(Await.result(tc2(2), 5.seconds)) // 2

0
投票
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try

import ExecutionContext.Implicits.global

object Transformations {
  type Transformation[T] = T => Future[T]

  private object DummyException extends Exception
  private val notReallyAFailedFuture: Future[Throwable] = Future.failed(DummyException)

  def transformationChain[T](chain: Seq[Transformation[T]])(implicit ectx: ExecutionContext): Transformation[T] = t =>
    if (chain.nonEmpty) {
      val initialFut = Future.successful(t)
      // resultFut will succeed if any of the transformations in the chain succeeded
      // lastFailure will fail if all of the transformations succeeded, otherwise it has the last failure
      val (resultFut: Future[T], lastFailure: Future[Throwable]) =
        chain.foldLeft((Future.failed[T](DummyException), notReallyAFailedFuture)) { (acc, v) =>
          val thisResult = acc._1.recoverWith {
            case _ => initialFut
          }.flatMap(v)
          val lastFailure = thisResult.failed.recoverWith { case _ => acc._2 }
          (thisResult.recoverWith { case _ => acc._1 }, lastFailure)
        }
      resultFut.recoverWith {
        case _ =>
          lastFailure.flatMap(Future.failed)
      }
    } else Future.successful(t)   // What to do with an empty chain is unspecified

  def main(args: Array[String]): Unit = {
    import scala.concurrent.Await
    import scala.concurrent.duration._

    val t1: Transformation[Int] = t => Future.successful(t + t)
    val t2: Transformation[Int] = _ => Future.failed(new NoSuchElementException)
    val t3: Transformation[Int] = t =>
      if (t > 2) Future.successful(t * t)
      else Future.failed(new NoSuchElementException)

    val tc1 = transformationChain(Seq(t1, t2, t2, t3))
    val tc2 = transformationChain(Seq(t2, t2, t2))
    val tc3 = transformationChain(Seq(t2, t3, t1))

    println(Try(Await.result(tc1(2), 5.seconds)))
    println(Try(Await.result(tc2(2), 5.seconds)))
    println(Try(Await.result(tc3(2), 5.seconds)))
  }
}

此实现假定:

  • 如果多个转换失败,则返回最后一个失败
  • 如果链为空,则进行身份转换

transformationChain现在确实需要隐式ExecutionContext来安排转换期货之间的“胶水”功能。在Scala 2.13+中,scala.concurrent.ExecutionContext.parasitic上下文实际上是执行这些快速转换的不错选择(并且基本上对其他功能没有用)。

为了使所有println执行,我将Await.result包裹在Try中。

为了简洁起见,使用一些失败的Future来表示没有结果。

© www.soinside.com 2019 - 2024. All rights reserved.