如何在Scala中使用Future进行民意调查?

问题描述 投票:6回答:3

我想轮询API端点,直到达到某种条件。我希望它能在几秒到一分钟内达到这个状态。我有一个方法来调用返回Future的端点。有没有什么方法可以将Futures连接起来每隔n毫秒轮询这个端点并在t尝试后放弃?

假设我有一个带有以下签名的函数:

def isComplete(): Future[Boolean] = ???

在我看来,最简单的方法是阻止所有事情:

def untilComplete(): Unit = {
  for { _ <- 0 to 10 } {
    val status = Await.result(isComplete(), 1.seconds)
    if (status) return Unit
    Thread.sleep(100)
  }
  throw new Error("Max attempts")
}

但这可能会占用所有线程而且不是异步的。我还考虑过递归:

def untilComplete(
    f: Future[Boolean] = Future.successful(false),
    attempts: Int = 10
  ): Future[Unit] = f flatMap { status =>
    if (status) Future.successful(Unit)
    else if (attempts == 0) throw new Error("Max attempts")
    else {
      Thread.sleep(100)
      untilComplete(isComplete(), attempts - 1)
    }
}

但是,我担心最大化调用堆栈,因为这不是尾递归。

有没有更好的方法呢?

编辑:我正在使用akka

scala asynchronous akka future polling
3个回答
4
投票

你可以使用Akka Streams。例如,要每隔500毫秒调用isComplete,直到Future的结果为真,最多五次:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import scala.concurrent.Future
import scala.concurrent.duration._

def isComplete(): Future[Boolean] = ???

implicit val system = ActorSystem("MyExample")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val stream: Future[Option[Boolean]] =
  Source(1 to 5)
    .throttle(1, 500 millis)
    .mapAsync(parallelism = 1)(_ => isComplete())
    .takeWhile(_ == false, true)
    .runWith(Sink.lastOption)

stream onComplete { result =>
  println(s"Stream completed with result: $result")
  system.terminate()
}

4
投票

它实际上根本不是递归的,所以堆栈会很好。

我能想到的一种改进方法是使用某种调度程序而不是Thread.sleep,这样就不会占用线程。

这个例子使用标准java的TimerTask,但如果你使用某种框架,比如akka,play或者其他什么,它可能有自己的调度程序,这将是一个更好的选择。

object Scheduler {
   val timer = new Timer(true)
   def after[T](d: Duration)(f :=> Future[T]): Future[T] = {
     val promise = Promise[T]()
     timer.schedule(TimerTask { def run() = promise.completeWith(f) }, d.toMillis)
     promise.future
   }
}


def untilComplete(attempts: Int = 10) = isComplete().flatMap { 
   case true => Future.successful(())
   case false if attempts > 1 => Scheduler.after(100 millis)(untilComplete(attempts-1))
   case _ => throw new Exception("Attempts exhausted.") 
}

3
投票

我给自己做了一个图书馆。我有

trait Poller extends AutoCloseable {
  def addTask[T]( task : Poller.Task[T] ) : Future[T]
  def close() : Unit
}

Poller.Task的样子

class Task[T]( val label : String, val period : Duration, val pollFor : () => Option[T], val timeout : Duration = Duration.Inf )

Poller民意调查每个period直到pollFor方法成功(产生Some[T])或超过timeout

为方便起见,当我开始轮询时,我将其包装成Poller.Task.withDeadline

final case class withDeadline[T] ( task : Task[T], deadline : Long ) {
  def timedOut = deadline >= 0 && System.currentTimeMillis > deadline
}

它将任务的(不可变的,可重复使用的)timeout持续时间转换为每次轮询尝试的最后期限。

为了有效地进行轮询,我使用Java的ScheduledExecutorService

def addTask[T]( task : Poller.Task[T] ) : Future[T] = {
  val promise = Promise[T]()
  scheduleTask( Poller.Task.withDeadline( task ), promise )
  promise.future
}

private def scheduleTask[T]( twd : Poller.Task.withDeadline[T], promise : Promise[T] ) : Unit = {
  if ( isClosed ) { 
    promise.failure( new Poller.ClosedException( this ) )
  } else {
    val task     = twd.task
    val deadline = twd.deadline

    val runnable = new Runnable {

      def run() : Unit = {
        try {
          if ( ! twd.timedOut ) {
            task.pollFor() match {
              case Some( value ) => promise.success( value )
              case None          => Abstract.this.scheduleTask( twd, promise )
            }
          } else {
            promise.failure( new Poller.TimeoutException( task.label, deadline ) )
          }
        }
        catch {
          case NonFatal( unexpected ) => promise.failure( unexpected )
        }
      }
    }

    val millis = task.period.toMillis
    ses.schedule( runnable, millis, TimeUnit.MILLISECONDS )
  }
}

它似乎运作良好,不需要睡觉或阻止个别Threads

(看看图书馆,可以做很多事情来使它更清晰,更容易阅读,并且通过制作该类Poller.Task.withDeadline的原始构造函数来阐明private的作用。截止日期应始终从任务timeout计算,不应该是任意的自由变量。)

此代码来自here (framework and trait)here (implementation)。 (如果你想使用它,直接的maven坐标是here。)

© www.soinside.com 2019 - 2024. All rights reserved.