我在一个N个单线程工作者(在N台机器上)上运行了一个函数,其中client.map和其中一个worker失败了。我想知道是否有办法自动处理工作人员引发的异常,将失败的任务重新分配给其他工作人员,并忽略或从池中排除它?
我已尝试使用下面显示的方法模拟问题。为了使一个工人失败,我在my_function
上对它提出了一个OSError,它被提交给client.map
,如:futures = client.map(my_function, range(100))
。在我的例子中,'Computer123'上的工作人员将是失败者。要处理my_function
抛出的异常,我在exception_handler
中使用sys.exit。因此,当一个任务在一个worker上失败时,会调用sys.exit。结果是,当客户端重新分配其失败的任务时,坏工作者的distributed.nanny捕获失败并重新启动工作程序。但是,一旦坏工人再次恢复,它就会再次接收任务,因为它仍在池中。它再次失败,重复该过程。随着它继续失败,最终其他工人完成了所有任务。如果我可以自动处理像“Computer123”这样的坏工作者的异常并将其从池中删除,那将是理想的选择。也许我需要将它从池中移除?
@exception_handler
def my_function(x):
import socket
import time
time.sleep(5)
if socket.gethostname() == 'Computer123':
raise(OSError)
else:
return x**2
def exception_handler(orig_func):
def wrapper(*args,**kwargs):
try:
return orig_func(*args,**kwargs)
except:
import sys
sys.exit(1)
return wrapper
作为一种解决方法,您可以保留一个坏工作人员的字典,每当您确定它是坏的时候(可能在它引发一定数量的例外之后)就为其添加主机名。
然后,当您要发布某个任务时,请检查它是否在违规列表中。就像是:
if socket.gethostname() in badHosts:
skip
else:
do_something()
如果您可以提供有关如何管理所连接池的更多详细信息,我可以提供有关如何直接删除它们的更多建议,而不必每次都进行检查。