如何从持久的石英作业中调度scala Future

问题描述 投票:0回答:1

假设,应该在java quartz library的帮助下安排一些scala代码。并且我们需要将该代码执行的结果存储在作业上下文中,以便在下一个作业执行中访问该结果。对于综合示例,有一些CounterService具有inc功能,应安排此时间:

trait CounterService {
  def inc(): Int
}

以下石英作业调用inc并将其结果成功存储在JobDataMap中:

@PersistJobDataAfterExecution
@DisallowConcurrentExecution
class CounterJob extends Job {
  val counterService: CounterService = ...

  override def execute(context: JobExecutionContext): Unit = {

    val newCounterValue: Int = counterService.inc()

    val map = context.getJobDetail.getJobDataMap
    map.put("counter", newCounterValue)  
  }
}

我们可以随时在其他地方获得工作结果(如果我们引用了scheduler):

val scheduler: Scheduler = ...
// gets details of our CounterJob which was created and registered in the scheduler
// by the name "counter-job" (it is not shown in our example)
val job = scheduler.getJobDetail(JobKey.jobKey("counter-job")) 
// this map will contain the job result which was stored by the key "counter"
val map = job.getJobDataMap.asScala 

但是如果我们想从石英作业中执行async代码,则此方法不起作用。例如,假设我们的柜台服务如下:

trait AsyncCounterService {
  def asyncInc(): Future[Int]
}

我们可以尝试通过以下方式实施我们的工作。但是它不能正常工作,因为方法CounterJob.execute可以比asyncCounterService.asyncInc早执行。而且我们无法将asyncInc的结果存储在JobDataMap中:

@PersistJobDataAfterExecution
@DisallowConcurrentExecution
class CounterJob extends Job {
  val counterService: AsyncCounterService = ...  
  val execContext: ExecutionContext = ...

  override def execute(context: JobExecutionContext): Unit = {

    // # 1: we can not influence on the execution flow of this future 
    //      from job scheduler.
    val counterFuture: Future[Int] = counterService.asyncInc() 

    counterFuture.map { counterValue: Int =>

      val map = context.getJobDetail.getJobDataMap  
      // #2: this action won't have any effect
      map.put("counter", counterValue)              
    }
  }
}

是此解决方案的至少两个问题,在上面的代码中标记为#1 ...#2 ...注释。

是否有解决此问题的更好方法?换句话说,如何在存有Future的情况下从持久性石英作业调度scala Future'sJobDetailData映射中?

java scala quartz-scheduler
1个回答
0
投票

如果CounterJob之后的所有内容都需要有counterService值,那么可以在CounterJob中阻塞并等待Future。那时候什么也不能执行,因为尚未计算该值。

import scala.concurrent.{Await,Future}
...

 try {
      val counterValue  = Await.result(counterFuture, 5.seconds)
      map.put("counter", counterValue)       
    } catch {
      case t: TimeoutException => ...
      case t: Exception => ...
   }

[如果您在该工作中具有多个异步期货,则可以将它们与flatMap, map操作for comprehension的单子链或Future伴随对象的静态助手方法(例如Future.sequence)结合使用那么最终结果将是结合所有异步操作的未来,您可以通过Await等待。

通常,等待期货被认为是不好的做法。因为这会阻止执行程序线程在等待将来完成时执行任何其他操作。

然而,您正在将另一个作业调度框架与另一个并发范例混合在一起。如上所述,在特定示例中,可以进行阻止,因为以后的所有操作都依赖于第一次计算。

如果其他作业可以同时运行,则有多种解决方法:

  1. [有一种方法可以使工作重归未来。然后,您可以等待这个将来完成,然后再安排依赖职位。
  2. 作业中有某种自定义事件侦听器机制,可以从作业中触发。 counterFuture.map {context.notify("computationReady")}
  3. [有特定的AsyncJob支持非阻塞io,它期望将Java Future作为返回值。然后,您可以将Scala Future转换为Java Future
© www.soinside.com 2019 - 2024. All rights reserved.