或工具 cp 模型性能,用于灵活的作业车间调度和过渡

问题描述 投票:0回答:1

我正在基于示例评估 OR 工具 cp 模型性能 https://github.com/google/or-tools/blob/stable/examples/contrib/scheduling_with_transitions_sat.py

我修改了代码以生成随机作业,并了解运行时间以检索第一个和“可接受的”解决方案 - “可接受”意味着时间表中没有太多/大的空整体。

# -*- coding: utf-8 -*-
"""Scheduling problem with transition time between tasks and transitions costs.

@author: CSLiu2
"""


import argparse
import collections
import random

from ortools.sat.python import cp_model
from google.protobuf import text_format

#----------------------------------------------------------------------------
# Command line arguments.
PARSER = argparse.ArgumentParser()
PARSER.add_argument(
    '--problem_instance', default=0, type=int, help='Problem instance.')
PARSER.add_argument(
    '--output_proto',
    default='',
    help='Output file to write the cp_model proto to.')
PARSER.add_argument('--params', default='', help='Sat solver parameters.')


#----------------------------------------------------------------------------
# Intermediate solution printer
class SolutionPrinter(cp_model.CpSolverSolutionCallback):
  """Print intermediate solutions."""

  def __init__(self, makespan):
    cp_model.CpSolverSolutionCallback.__init__(self)
    self.__solution_count = 0
    self.__makespan = makespan

  def OnSolutionCallback(self):
    print('Solution %i, time = %f s, objective = %i, makespan = %i' %
          (self.__solution_count, self.WallTime(), self.ObjectiveValue(),
           self.Value(self.__makespan)))
    self.__solution_count += 1


def main(args):
  """Solves the scheduling with transitions problem."""

  instance = args.problem_instance
  parameters = args.params
  output_proto = args.output_proto

  #----------------------------------------------------------------------------
  # Data.
  # get random jobs
  jobs = []
  sum_tasks = 0

  num_machines = 8
  for i in range(0, 30): # create jobs
      tasks = []
      for j in range(0, random.randint(3, 5)): # create tasks per job
          alternatives = []
          sum_tasks += 1
          machines = list(range(0, num_machines)) # list of machines to randomly pick one for each task
          for k in range(0, 2): # create two alternatives per task
              machineInd = random.randint(0,len(machines)-1)
              alternatives.append((random.randint(6, 35), machines[machineInd], 'R'+str(random.randint(0, 6))))
              del machines[machineInd]
          tasks.append(alternatives)
      jobs.extend([tasks])

  print('total number of tasks: %i' % sum_tasks)
  print('total number of jobs: %i' % len(jobs))
  #----------------------------------------------------------------------------
  # Helper data.
  num_jobs = len(jobs)
  all_jobs = range(num_jobs)
  all_machines = range(num_machines)

  #----------------------------------------------------------------------------
  # Model.
  model = cp_model.CpModel()

  #----------------------------------------------------------------------------
  # Compute a maximum makespan greedily.
  horizon = 0
  for job in jobs:
    for task in job:
      max_task_duration = 0
      for alternative in task:
        max_task_duration = max(max_task_duration, alternative[0])
      horizon += max_task_duration

  print('Horizon = %i' % horizon)

  #----------------------------------------------------------------------------
  # Global storage of variables.
  intervals_per_machines = collections.defaultdict(list)
  presences_per_machines = collections.defaultdict(list)
  starts_per_machines = collections.defaultdict(list)
  ends_per_machines = collections.defaultdict(list)
  resources_per_machines = collections.defaultdict(list)
  ranks_per_machines = collections.defaultdict(list)

  job_starts = {}  # indexed by (job_id, task_id).
  job_presences = {}  # indexed by (job_id, task_id, alt_id).
  job_ranks = {}  # indexed by (job_id, task_id, alt_id).
  job_ends = []  # indexed by job_id

  #----------------------------------------------------------------------------
  # Scan the jobs and create the relevant variables and intervals.
  for job_id in all_jobs:
    job = jobs[job_id]
    num_tasks = len(job)
    previous_end = None
    for task_id in range(num_tasks):
      task = job[task_id]

      min_duration = task[0][0]
      max_duration = task[0][0]

      num_alternatives = len(task)
      all_alternatives = range(num_alternatives)

      for alt_id in range(1, num_alternatives):
        alt_duration = task[alt_id][0]
        min_duration = min(min_duration, alt_duration)
        max_duration = max(max_duration, alt_duration)

      # Create main interval for the task.
      suffix_name = '_j%i_t%i' % (job_id, task_id)
      start = model.NewIntVar(0, horizon, 'start' + suffix_name)
      duration = model.NewIntVar(min_duration, max_duration,
                                 'duration' + suffix_name)
      end = model.NewIntVar(0, horizon, 'end' + suffix_name)

      # Store the start for the solution.
      job_starts[(job_id, task_id)] = start

      # Add precedence with previous task in the same job.
      if previous_end is not None:
        model.Add(start >= previous_end)
      previous_end = end

      # Create alternative intervals.
      l_presences = []
      for alt_id in all_alternatives:
        ### add to link interval with eqp constraint.
        ### process time = -1 cannot be processed at this machine.
        if jobs[job_id][task_id][alt_id][0] == -1:
          continue
        alt_suffix = '_j%i_t%i_a%i' % (job_id, task_id, alt_id)
        l_presence = model.NewBoolVar('presence' + alt_suffix)
        l_start = model.NewIntVar(0, horizon, 'start' + alt_suffix)
        l_duration = task[alt_id][0]
        l_end = model.NewIntVar(0, horizon, 'end' + alt_suffix)
        l_interval = model.NewOptionalIntervalVar(
            l_start, l_duration, l_end, l_presence, 'interval' + alt_suffix)
        l_rank = model.NewIntVar(-1, 1000, 'rank' + alt_suffix)
        l_presences.append(l_presence)
        l_machine = task[alt_id][1]
        l_type = task[alt_id][2]

        # Link the original variables with the local ones.
        model.Add(start == l_start).OnlyEnforceIf(l_presence)
        model.Add(duration == l_duration).OnlyEnforceIf(l_presence)
        model.Add(end == l_end).OnlyEnforceIf(l_presence)

        # Add the local variables to the right machine.
        intervals_per_machines[l_machine].append(l_interval)
        starts_per_machines[l_machine].append(l_start)
        ends_per_machines[l_machine].append(l_end)
        presences_per_machines[l_machine].append(l_presence)
        resources_per_machines[l_machine].append(l_type)
        ranks_per_machines[l_machine].append(l_rank)

        # Store the variables for the solution.
        job_presences[(job_id, task_id, alt_id)] = l_presence
        job_ranks[(job_id, task_id, alt_id)] = l_rank

      # Only one machine can process each lot.
      model.Add(sum(l_presences) == 1)

    job_ends.append(previous_end)

  #----------------------------------------------------------------------------
  # Create machines constraints nonoverlap process
  for machine_id in all_machines:
    intervals = intervals_per_machines[machine_id]
    if len(intervals) > 1:
      model.AddNoOverlap(intervals)

  #----------------------------------------------------------------------------
  # Transition times and transition costs using a circuit constraint.
  switch_literals = []
  for machine_id in all_machines:
    machine_starts = starts_per_machines[machine_id]
    machine_ends = ends_per_machines[machine_id]
    machine_presences = presences_per_machines[machine_id]
    machine_resources = resources_per_machines[machine_id]
    machine_ranks = ranks_per_machines[machine_id]
    intervals = intervals_per_machines[machine_id]
    arcs = []
    num_machine_tasks = len(machine_starts)
    all_machine_tasks = range(num_machine_tasks)

    for i in all_machine_tasks:
      # Initial arc from the dummy node (0) to a task.
      start_lit = model.NewBoolVar('')
      arcs.append([0, i + 1, start_lit])
      # If this task is the first, set both rank and start to 0.
      model.Add(machine_ranks[i] == 0).OnlyEnforceIf(start_lit)
      model.Add(machine_starts[i] == 0).OnlyEnforceIf(start_lit)
      # Final arc from an arc to the dummy node.
      arcs.append([i + 1, 0, model.NewBoolVar('')])
      # Self arc if the task is not performed.
      arcs.append([i + 1, i + 1, machine_presences[i].Not()])
      model.Add(machine_ranks[i] == -1).OnlyEnforceIf(
          machine_presences[i].Not())

      for j in all_machine_tasks:
        if i == j:
          continue

        lit = model.NewBoolVar('%i follows %i' % (j, i))
        arcs.append([i + 1, j + 1, lit])
        model.AddImplication(lit, machine_presences[i])
        model.AddImplication(lit, machine_presences[j])

        # Maintain rank incrementally.
        model.Add(machine_ranks[j] == machine_ranks[i] + 1).OnlyEnforceIf(lit)

        # Compute the transition time if task j is the successor of task i.
        if machine_resources[i] != machine_resources[j]:
          transition_time = 3
          switch_literals.append(lit)
        else:
          transition_time = 0
        # We add the reified transition to link the literals with the times
        # of the tasks.
        model.Add(machine_starts[j] == machine_ends[i] +
                  transition_time).OnlyEnforceIf(lit)
    if arcs:
        model.AddCircuit(arcs)

  #----------------------------------------------------------------------------
  # Objective.
  makespan = model.NewIntVar(0, horizon, 'makespan')
  model.AddMaxEquality(makespan, job_ends)
  makespan_weight = 1
  transition_weight = 5
  #model.Minimize(makespan * makespan_weight +
  #               sum(switch_literals) * transition_weight)
  model.Minimize(makespan)

  #----------------------------------------------------------------------------
  # Write problem to file.
  if output_proto:
    print('Writing proto to %s' % output_proto)
    with open(output_proto, 'w') as text_file:
      text_file.write(str(model))

  #----------------------------------------------------------------------------
  # Solve.
  solver = cp_model.CpSolver()
  solver.parameters.num_search_workers = 16
  solver.parameters.max_time_in_seconds = 60 * 60 * 2
  if parameters:
    text_format.Merge(parameters, solver.parameters)
  solution_printer = SolutionPrinter(makespan)
  status = solver.Solve(model, solution_printer)

  #----------------------------------------------------------------------------
  # Print solution.
  if status == cp_model.FEASIBLE or status == cp_model.OPTIMAL:
    for job_id in all_jobs:
      for task_id in range(len(jobs[job_id])):
        start_value = solver.Value(job_starts[(job_id, task_id)])
        machine = 0
        duration = 0
        select = 0
        rank = -1

        for alt_id in range(len(jobs[job_id][task_id])):
          if jobs[job_id][task_id][alt_id][0] == -1:
            continue

          if solver.BooleanValue(job_presences[(job_id, task_id, alt_id)]):
            duration = jobs[job_id][task_id][alt_id][0]
            machine = jobs[job_id][task_id][alt_id][1]
            select = alt_id
            rank = solver.Value(job_ranks[(job_id, task_id, alt_id)])

        print(
            '  Job %i starts at %i (alt %i, duration %i) with rank %i on machine %i'
            % (job_id, start_value, select, duration, rank, machine))

    print('Solve status: %s' % solver.StatusName(status))
    print('Objective value: %i' % solver.ObjectiveValue())
    print('Makespan: %i' % solver.Value(makespan))


if __name__ == '__main__':
  main(PARSER.parse_args())

我正在 16 vCPU 机器上进行测试,我观察到以下性能:

  • 任务总数略低于 100 个时,优化器通常会在几秒钟内响应第一个解决方案。有时,检索第一个解决方案需要长达 60 到 90 秒的时间
  • 任务总数略高于 100 个(很可能使用上述参数),性能会下降,并且优化器在很长时间内不会返回第一个解决方案。我让它运行一次,500 秒后它返回第一个解决方案。

现在我的问题是:

  1. 这样的表现值得期待吗?或者我是否在监督代码中的某种错误配置?

  2. 总的来说,在这个例子中我可以做一些性能调整吗?

  3. 在这样的作业车间调度任务中,并且有一台 16 CPU 的机器运行优化,您认为“可实现”的性能是多少?更具体地说,我希望优化器在大约 15 秒内返回第一个解决方案,并在 60-90 秒内返回“可接受”的解决方案。在这个具有过渡时间的灵活作业车间调度任务中(甚至可能包含每个任务的截止日期),您认为优化器可以处理多少个作业、任务、替代方案、机器,以便在此特定机器上的所需时间范围内做出响应?

非常感谢您的反馈!

python job-scheduling or-tools
1个回答
0
投票

与顺序相关的设置时间(灵活)作业车间是非常困难的问题。 即使是中等规模的问题(100 个任务),也不要指望有任何最优性证明。

使用 CP-SAT 可以做的最有影响力的事情是编写一个贪婪的第一个解决方案启发式,并向求解器提示解决方案。调度启发式方法不适用于此类问题。

注意:有(一些)技巧可以提高性能。

  • 计算一个间隔与所有其他间隔之间的最小转换,并将该最小持续时间从转换转移到间隔的持续时间是一个很好的方法。
  • 第二个是在 jobshop_sat.cc 中实现的。它创建一个完工时间间隔(从完工时间到地平线)。这有助于充满活力的推理。
© www.soinside.com 2019 - 2024. All rights reserved.