如何忽略任务失败的工作人员并将其任务重新分配给其他工作人员?

问题描述 投票:0回答:1

我在一个N个单线程工作者(在N台机器上)上运行了一个函数,其中client.map和其中一个worker失败了。我想知道是否有办法自动处理工作人员引发的异常,将失败的任务重新分配给其他工作人员,并忽略或从池中排除它?

我已尝试使用下面显示的方法模拟问题。为了使一个工人失败,我在my_function上对它提出了一个OSError,它被提交给client.map,如:futures = client.map(my_function, range(100))。在我的例子中,'Computer123'上的工作人员将是失败者。要处理my_function抛出的异常,我在exception_handler中使用sys.exit。因此,当一个任务在一个worker上失败时,会调用sys.exit。结果是,当客户端重新分配其失败的任务时,坏工作者的distributed.nanny捕获失败并重新启动工作程序。但是,一旦坏工人再次恢复,它就会再次接收任务,因为它仍在池中。它再次失败,重复该过程。随着它继续失败,最终其他工人完成了所有任务。如果我可以自动处理像“Computer123”这样的坏工作者的异常并将其从池中删除,那将是理想的选择。也许我需要将它从池中移除?

@exception_handler
def my_function(x):
  import socket 
  import time
  time.sleep(5)
  if socket.gethostname() == 'Computer123':
    raise(OSError)
  else:
    return x**2

def exception_handler(orig_func):
  def wrapper(*args,**kwargs):
    try:
      return orig_func(*args,**kwargs)
    except:
      import sys
      sys.exit(1)
  return wrapper
python distributed-computing dask dask-distributed
1个回答
0
投票

作为一种解决方法,您可以保留一个坏工作人员的字典,每当您确定它是坏的时候(可能在它引发一定数量的例外之后)就为其添加主机名。

然后,当您要发布某个任务时,请检查它是否在违规列表中。就像是:

  if socket.gethostname() in badHosts:
    skip
  else:
    do_something()

如果您可以提供有关如何管理所连接池的更多详细信息,我可以提供有关如何直接删除它们的更多建议,而不必每次都进行检查。

© www.soinside.com 2019 - 2024. All rights reserved.