Scala线程池 - 并发调用API的方法

问题描述 投票:1回答:1

我在databricks有一个用例,其中一个API调用必须在一个URL的数据集上进行,这个数据集有100K左右的记录,最大允许并发量是3。这个数据集有大约100K条记录,最大允许的并发数是3。 我用Scala实现了这个功能,并在databricks的笔记本上运行。除了队列中的一个元素待定外,我觉得这里还缺少一些东西。Blocking Queue和Thread Pool是否是解决这个问题的正确方法。

在下面的代码中,我已经修改了,我不是从数据集读取,而是在Seq上采样。任何帮助思想将是非常感激的。

 
import java.time.LocalDateTime
import java.util.concurrent.{ArrayBlockingQueue,BlockingQueue}
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit; 

var inpQueue:BlockingQueue[(Int, String)] = new ArrayBlockingQueue[(Int, String)](1)

val inpDS = Seq((1,"https://google.com/2X6barD"), (2,"https://google.com/3d9vCgW"), (3,"https://google.com/2M02Xz0"), (4,"https://google.com/2XOu2uL"), (5,"https://google.com/2AfBWF0"), (6,"https://google.com/36AEKsw"), (7,"https://google.com/3enBxz7"), (8,"https://google.com/36ABq0x"), (9,"https://google.com/2XBjmiF"), (10,"https://google.com/36Emlen"))


val pool = Executors.newFixedThreadPool(3) 
var i = 0
inpDS.foreach{
  ix => {

    inpQueue.put(ix)
    val t = new ConsumerAPIThread()
    t.setName("MyThread-"+i+" ")
    pool.execute(t)

  }
   i = i+1
}

println("Final Queue Size = " +inpQueue.size+"\n")


class ConsumerAPIThread() extends Thread  
{ 
  var name =""

    override def run() 
    { 
        val urlDetail =  inpQueue.take()
        print(this.getName()+" "+ Thread.currentThread().getName() + " popped "+urlDetail+" Queue Size "+inpQueue.size+" \n") 
      triggerAPI((urlDetail._1, urlDetail._2))
    } 

    def triggerAPI(params:(Int,String)){

    try{
      val result = scala.io.Source.fromURL(params._2)
      println("" +result)
    }catch{
     case ex:Exception  => {

       println("Exception caught")
       }

    }

  }
   def ConsumerAPIThread(s:String) 
    { 
        name = s; 
    } 
}
scala apache-spark functional-programming databricks blockingqueue
1个回答
2
投票

所以,你有两个需求:功能性需求是你想异步处理列表中的项目,非功能性需求是你想一次不处理超过三个项目。

关于后者,好的一点是,正如你在问题中已经表明的那样,Java本机提供了一个很好的打包的 Executor 在一个固定大小的线程池上运行任务,如果你使用线程,可以优雅地允许你对并发级别进行限制。

在功能需求方面,Scala通过在其标准API中加入这样的功能来帮助我们。特别是它使用了 scala.concurrent.Future因此,为了使用它,我们必须重构一下 triggerAPI 就...而言 Future. 这个函数的内容并不是特别相关,所以我们现在主要关注它的(修改后)签名。

import scala.concurrent.Future
import scala.concurrent.ExecutionContext

def triggerAPI(params: (Int, String))(implicit ec: ExecutionContext): Future[Unit] =
  Future {
    // some code that takes some time to run...
  }

注意,现在 triggerAPI 返回一个 Future. A Future 可以被认为是对最终要计算的东西的读柄。特别是,这是一个 Future[Unit],其中 Unit 代表 "我们不是特别在意这个函数的输出,主要是在意它的副作用".

此外,请注意,该方法现在取一个 隐性参数ExecutionContext. 该 ExecutionContext 是用来提供 Future的环境中进行计算。Scala有一个API来创建一个 ExecutionContextjava.util.concurrent.ExecutorService因此,这将会很方便地在固定的线程池上运行我们的计算,在任何时候运行不超过三个回调。

在前进之前,如果你有关于 Futures, ExecutionContexts和 隐性参数,Scala文档是你最好的知识来源(这里有几个要点。1, 2).

现在我们有了新的 triggerAPI 方法,我们可以使用 Future.traverse (这里是Scala 2.12的文档。 --在写这篇文章的时候,最新的版本是2.13,但据我所知,Spark用户目前还停留在2.12上)。)

这个 闲聊Future.traverse 就是说,它采用某种形式的容器和一个函数,将容器中的项目取出并返回一个 Future 的其他东西。该函数将被应用于容器中的每一个项目,结果将是一个 Future 的容器的结果。在你的例子中:容器是一个 List这些项目是 (Int, String) 而你返回的东西是一个 Unit.

这意味着,你可以简单地这样调用它。

Future.traverse(inpDS)(triggerAPI)

And triggerAPI 中的每个项目都将适用于 inpDS.

通过确保线程池支持的执行上下文在调用 Future.traverse,这些项目将用所需的线程池进行处理。

调用的结果是 Future[List[Unit]]这不是很有趣,可以直接抛弃(因为你只对副作用感兴趣)。

说了这么多,如果你想玩玩我描述的代码,你可以这样做。在Scastie上.

供参考,这是整个实现。

import java.util.concurrent.{ExecutorService, Executors}

import scala.concurrent.duration.DurationLong
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

val datasets = List(
  (1, "https://google.com/2X6barD"),
  (2, "https://google.com/3d9vCgW"),
  (3, "https://google.com/2M02Xz0"),
  (4, "https://google.com/2XOu2uL"),
  (5, "https://google.com/2AfBWF0"),
  (6, "https://google.com/36AEKsw"),
  (7, "https://google.com/3enBxz7"),
  (8, "https://google.com/36ABq0x"),
  (9, "https://google.com/2XBjmiF")
)

val executor: ExecutorService = Executors.newFixedThreadPool(3)
implicit val executionContext: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(executor)

def triggerAPI(params: (Int, String))(implicit ec: ExecutionContext): Future[Unit] =
  Future {
    val (index, _) = params
    println(s"+ started processing $index")
    val start = System.nanoTime() / 1000000
    Iterator.from(0).map(_ + 1).drop(100000000).take(1).toList.head // a noticeably slow operation
    val end = System.nanoTime() / 1000000
    val duration = (end - start).millis
    println(s"- finished processing $index after $duration")
  }

Future.traverse(datasets)(triggerAPI).onComplete {
  case result =>
    println("* processing is over, shutting down the executor")
    executionContext.shutdown()
}

0
投票

你需要关闭 Executor 在你的工作完成后,否则它将等待。

试试添加 pool.shutdown() 你的程序结束。

© www.soinside.com 2019 - 2024. All rights reserved.