我是并行执行和scala的新手。我有一些关于在scala中使用Future的问题。
我相信Future是允许异步并行执行的。所以,根据我的理解,在下面的代码中。donutStock
方法会在一个单独的线程上运行。官方文件也说了,它不会阻塞主线程。所以如果主线程没有被阻塞,那么新的子线程和主线程应该是并行执行的。
所以在下面的例子中,我期望一旦调用donutStock方法,主线程上的控制就应该前进,然后主线程应该调用另一个线程上的第二个donutStock方法。
但是,我注意到,第二个方法是在第一次调用完成后才被调用的。我对非阻塞或异步的理解是否正确?如果我想同时执行两个方法的调用是并行的,那么正确的方法是什么。
我看了一下,我们应该在服务器主线程中进行异步操作。在这种情况下,异步操作的好处是什么呢?
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
def donutStock(donut: String): Future[Int] = Future {
(1 until 100).foreach { value ⇒
println(s"checking donut stock $donut")
}
10
}
donutStock("My Donut").onComplete{
case Success(value) ⇒ println("Call 1 Completed")
case Failure(exception) ⇒ println("Call 1 Failed")
}
donutStock("Your Donut").onComplete{
case Success(value) ⇒ println("Call 2 Completed")
case Failure(exception) ⇒ println("Call 2 Failed")
}
当Future被创建时,它通常使用一个线程立即启动。如果在你当前的执行上下文中没有可用的线程,那么它可能不会立即启动你的未来,而是等待线程被释放。
如果你的执行上下文中只有一个线程可用,那么可能会发生下一个未来的执行将不得不等待上一个未来的完成。
通常情况下,执行上下文会有更多的线程可用(例如在scala的全局执行上下文中,线程数默认为可用线程数)。
在你的情况下,问题可能是,你的第一个未来可能完成的太快,以至于在第二个未来开始之前就完成了。
你可以通过在打印值后引入小的延迟来缓解这个问题,例如通过在打印值后添加 Thread.sleep(10)
之后 println(s"checking donut stock $donut")
.
这可能会导致另一个问题,由于期货是在守护线程中开始的,可能会发生主线程在期货执行结束前终止的情况。在这种情况下,它们将在调用 onComplete
回调。
你可以避免这种情况,你可以使用 Await
比如说,我们可以等待未来。
import scala.concurrent._
import scala.concurrent.duration._
val f1 = donutStock("My Donut").onComplete{
case Success(value) ⇒ println("Call 1 Completed")
case Failure(exception) ⇒ println("Call 1 Failed")
}
val f2 = donutStock("Your Donut").onComplete{
case Success(value) ⇒ println("Call 2 Completed")
case Failure(exception) ⇒ println("Call 2 Failed")
}
val result1 = Await.result(f1, 1 second)
val result2 = Await.result(f2, 1 second)
如果我们可以等待未来,什么是使用的情况下,以 onComplete
回调?例如,当我们定义了一个返回Future的函数,而我们又不想阻止它使用 Await
但我们还是希望在未来完成后执行一些操作。
例如,你可以修改你的 donutStock
如下图所示。
def donutStock(donut: String, idx: Int): Future[Int] = {
val f = Future {
(1 until 100).foreach { value ⇒
println(s"checking donut stock $donut")
}
10
}
//we don't block future, but onComplete callback will be still executed when future ends
f.onComplete{
case Success(value) ⇒ println(s"Call $idx Completed")
case Failure(exception) ⇒ println(s"Call $idx Failed")
}
f
}
Future是Scala中编写多线程代码的标准机制。每当我们创建一个新的Future操作时,Scala就会生成一个新的线程来运行该Future的代码,并在完成后执行任何提供的回调。
为了使用Futures,Scala要求我们提供一个隐式执行上下文,它控制Futures执行的线程池。我们可以创建自己的执行上下文,或者使用默认的执行上下文,这通常就足够了。默认的执行上下文是由Fork Join Thread Pool支持的。从代码中可以看出,这个例子使用的是隐式上下文。
def donutStock(donut: String): Future[Int] = Future {
(1 until 100).foreach { value ⇒
println(s"checking donut stock $donut")
}
10
}
上面所附的代码将在自己的线程中执行,当该函数的 donutStock(<string>)
返回类型为 Future[Int]
.
Scala允许我们定义回调函数,在Future成功或失败时执行。同时,创建Future的线程被解封,可以继续执行,如下图。
donutStock("My Donut").onComplete{
case Success(value) ⇒ println("Call 1 Completed")
case Failure(exception) ⇒ println("Call 1 Failed")
}
donutStock("Your Donut").onComplete{
case Success(value) ⇒ println("Call 2 Completed")
case Failure(exception) ⇒ println("Call 2 Failed")
}
在Future donutStock()成功完成后,onComplete回调会收到一个Success对象,其中包含了作为 10
.