Scala线程池-同时调用API

问题描述 投票:1回答:1

我在数据块中有一个用例,其中对URL的数据集进行了API调用。数据集约有10万条记录。允许的最大并发为3。我在Scala中进行了实现,并在databricks笔记本中运行。除了队列中待处理的一个元素之外,我觉得这里还缺少一些东西。阻塞队列和线程池是否是解决此问题的正确方法。

在下面的代码中,我进行了修改,而不是从数据集中读取,而是在Seq上进行采样。任何帮助/想法将不胜感激。


import java.time.LocalDateTime
import java.util.concurrent.{ArrayBlockingQueue,BlockingQueue}
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit; 

var inpQueue:BlockingQueue[(Int, String)] = new ArrayBlockingQueue[(Int, String)](1)

val inpDS = Seq((1,"https://google.com/2X6barD"), (2,"https://google.com/3d9vCgW"), (3,"https://google.com/2M02Xz0"), (4,"https://google.com/2XOu2uL"), (5,"https://google.com/2AfBWF0"), (6,"https://google.com/36AEKsw"), (7,"https://google.com/3enBxz7"), (8,"https://google.com/36ABq0x"), (9,"https://google.com/2XBjmiF"), (10,"https://google.com/36Emlen"))


val pool = Executors.newFixedThreadPool(3) 
var i = 0
inpDS.foreach{
  ix => {

    inpQueue.put(ix)
    val t = new ConsumerAPIThread()
    t.setName("MyThread-"+i+" ")
    pool.execute(t)

  }
   i = i+1
}

println("Final Queue Size = " +inpQueue.size+"\n")


class ConsumerAPIThread() extends Thread  
{ 
  var name =""

    override def run() 
    { 
        val urlDetail =  inpQueue.take()
        print(this.getName()+" "+ Thread.currentThread().getName() + " popped "+urlDetail+" Queue Size "+inpQueue.size+" \n") 
      triggerAPI((urlDetail._1, urlDetail._2))
    } 

    def triggerAPI(params:(Int,String)){

    try{
      val result = scala.io.Source.fromURL(params._2)
      println("" +result)
    }catch{
     case ex:Exception  => {

       println("Exception caught")
       }

    }

  }
   def ConsumerAPIThread(s:String) 
    { 
        name = s; 
    } 
}
scala apache-spark functional-programming databricks blockingqueue
1个回答
0
投票

您需要在完成其他工作后关闭Executor,否则它将等待。

尝试添加程序的pool.shutdown()结尾。

© www.soinside.com 2019 - 2024. All rights reserved.