我正在尝试创建一个转换链来定义给定功能的可能转换:
type Transformation[T] = T => Future[T]
def transformationChain[T](chain: Seq[Transformation[T]]): Transformation[T] = {
}
val t1: Transformation[Int] = t => Future.successful(t + t)
val t2: Transformation[Int] = _ => Future.failed(new NoSuchElementException)
val t3: Transformation[Int] = t =>
if (t > 2) Future.successful(t * t)
else Future.failed(new NoSuchElementException)
val tc = transformationChain(Seq(t1, t2, t2, t3))
val tc2 = transformationChain(Seq(t2, t2, t2))
val tc3 = transformationChain(Seq(t2, t3, t1))
println(Await.result(tc(2), 5.seconds)) // 16
println(Await.result(tc2(2), 5.seconds)) // throw NoSuchElementException
println(Await.result(tc3(2), 5.seconds)) // 4
问题是我不了解如何在“ transformationChain”方法中解包这些函数,以通过在循环中或递归调用它们将结果发送到链中的每个下一个函数。
您所说的Transformation(因此A => F [B]的功能)通常被称为Kleisli箭头。
Cats库具有数据类型,这使得对这些类型的函数的操作更加容易。例如,它具有方法andThen,该方法允许组合以下功能:
import cats.data.Kleisli
import cats.implicits._
val t1: Transformation[Int] = t => Future.successful(t + t)
val t2: Transformation[Int] = _ => Future.failed(new NoSuchElementException)
Kleisli(t1).andThen(Kleisli(t2))
唯一的问题是,您的一个转换可能会返回失败的未来,这会缩短整个链条。我们可以使用recoverWith。
进行修复。所以最终transformationChain可能看起来像:
def transformationChain[T](chain: Seq[Transformation[T]]): Transformation[T] =
t =>
chain
//wrap the function in Kleisli and then use replace failed futures with succeeded
//future, that are passing value over
.map(Kleisli(_).recoverWith {
case _ => Kleisli(x => Future.successful(x))
})
.reduce(_ andThen _) //combine all elements with andThen
.apply(t)
对于情况1和3正常工作,但对于情况2失败,因为它将仅返回传递的值。
println(Await.result(tc(2), 5.seconds)) // 16
println(Await.result(tc3(2), 5.seconds)) // 4
println(Await.result(tc2(2), 5.seconds)) // 2
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try
import ExecutionContext.Implicits.global
object Transformations {
type Transformation[T] = T => Future[T]
private object DummyException extends Exception
private val notReallyAFailedFuture: Future[Throwable] = Future.failed(DummyException)
def transformationChain[T](chain: Seq[Transformation[T]])(implicit ectx: ExecutionContext): Transformation[T] = t =>
if (chain.nonEmpty) {
val initialFut = Future.successful(t)
// resultFut will succeed if any of the transformations in the chain succeeded
// lastFailure will fail if all of the transformations succeeded, otherwise it has the last failure
val (resultFut: Future[T], lastFailure: Future[Throwable]) =
chain.foldLeft((Future.failed[T](DummyException), notReallyAFailedFuture)) { (acc, v) =>
val thisResult = acc._1.recoverWith {
case _ => initialFut
}.flatMap(v)
val lastFailure = thisResult.failed.recoverWith { case _ => acc._2 }
(thisResult.recoverWith { case _ => acc._1 }, lastFailure)
}
resultFut.recoverWith {
case _ =>
lastFailure.flatMap(Future.failed)
}
} else Future.successful(t) // What to do with an empty chain is unspecified
def main(args: Array[String]): Unit = {
import scala.concurrent.Await
import scala.concurrent.duration._
val t1: Transformation[Int] = t => Future.successful(t + t)
val t2: Transformation[Int] = _ => Future.failed(new NoSuchElementException)
val t3: Transformation[Int] = t =>
if (t > 2) Future.successful(t * t)
else Future.failed(new NoSuchElementException)
val tc1 = transformationChain(Seq(t1, t2, t2, t3))
val tc2 = transformationChain(Seq(t2, t2, t2))
val tc3 = transformationChain(Seq(t2, t3, t1))
println(Try(Await.result(tc1(2), 5.seconds)))
println(Try(Await.result(tc2(2), 5.seconds)))
println(Try(Await.result(tc3(2), 5.seconds)))
}
}
此实现假定:
transformationChain
现在确实需要隐式ExecutionContext
来安排转换期货之间的“胶水”功能。在Scala 2.13+中,scala.concurrent.ExecutionContext.parasitic
上下文实际上是执行这些快速转换的不错选择(并且基本上对其他功能没有用)。
为了使所有println
执行,我将Await.result
包裹在Try
中。
为了简洁起见,使用一些失败的Future
来表示没有结果。