(Spark Schedular) Spark 作业池中的公平和先进先出有什么区别?

问题描述 投票:0回答:1

我知道对于 Spark,我们可以将不同的池设置为公平或先进先出,并且行为可以不同。然而,在 fairscheduler.xml 中,我们还可以将单个池设置为 Fair 或 FIFO,我测试了几次,因为它们的行为似乎是相同的。然后我看了一下spark源码,调度算法是这样的:

/**
 * An interface for sort algorithm
 * FIFO: FIFO algorithm between TaskSetManagers
 * FS: FS algorithm between Pools, and FIFO or FS within Pools
 */
private[spark] trait SchedulingAlgorithm {
  def comparator(s1: Schedulable, s2: Schedulable): Boolean
}

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm{
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    res < 0
  }
}

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm{
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble

    var compare = 0
    if (s1Needy && !s2Needy) {
      return true
    } else if (!s1Needy && s2Needy) {
      return false
    } else if (s1Needy && s2Needy) {
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }
    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {
      s1.name < s2.name
    }
  }
}

在fairSchedulingAlgorithm中,如果s1和s2来自同一个池,则minshare、runningtask和weight应该相同,这样我们总是可以得到返回值为false的值。所以它们不是公平的,而是先进先出的。我的 fairscheduler.xml 是这样的:

<?xml version="1.0"?>

    <allocations>
      <pool name="default">
        <schedulingMode>FAIR</schedulingMode>
        <weight>3</weight>
        <minShare>2</minShare>
      </pool>
      <pool name="cubepublishing">
          <schedulingMode>FAIR</schedulingMode>
          <weight>1</weight>
          <minShare>0</minShare>
      </pool>
    </allocations>

spark.scheduler.mode 是:

# job scheduler
spark.scheduler.mode              FAIR
spark.scheduler.allocation.file   conf/fairscheduler.xml

感谢您的帮助!

apache-spark bigdata job-scheduling
1个回答
0
投票

当您使用 Spark-Submit 或任何其他方式在集群中提交作业时,它将交给 Spark 调度程序,负责实现作业的逻辑计划。在 Spark 中,我们有两种模式。

1。先进先出 默认情况下,Spark 的调度程序以 FIFO 方式运行作业。每个作业分为多个阶段(例如映射和减少阶段),第一个作业在所有可用资源上获得优先级,而其阶段有任务要启动,然后第二个作业获得优先级,依此类推。如果队列头部的作业不需要使用整个集群,后面的作业可以立即开始运行,但是如果队列头部的作业很大,那么后面的作业可能会明显延迟。

2。公平 公平调度程序还支持将作业分组到池中,并为每个池设置不同的调度选项(例如权重)。例如,这对于为更重要的作业创建高优先级池很有用,或者将每个用户的作业分组在一起并为用户提供平等的份额,无论他们有多少并发作业,而不是为作业提供平等的份额。这种方法是根据 Hadoop Fair Scheduler 建模的。

在没有任何干预的情况下,新提交的作业会进入默认池,但可以通过将spark.scheduler.pool“本地属性”添加到提交作业的线程中的SparkContext来设置作业池。

© www.soinside.com 2019 - 2024. All rights reserved.