捕获任何错误或异常时关闭 dask 客户端和集群

问题描述 投票:0回答:1

我正在编写一个python函数来使用dask进行数据处理。如果出现任何错误或异常,我想自动关闭 dask 集群和客户端。所以我使用

with ... as:
语句。函数结构为:

def func(input:str, # path to input
         output:str, # path to output
        ):
    with LocalCluster() as cluster, Client(cluster) as client:
        # load the input with dask
        # set up computing graph
        da.compute([...])

如果我在处理数据时发出键盘中断,则集群和客户端成功关闭,即

da.compute()
。但是,当程序正在设置集群时,即调用
LocalCluster()
时,当我进行中断时,集群没有成功关闭。我会得到如下的东西:

......
KeyboardInterrupt: 
2023-10-20 18:55:38,520 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,523 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,529 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,532 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,535 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,548 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,549 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,556 - distributed.nanny - WARNING - Restarting worker

工人没有停下来。

所以,下次当我再次运行这个函数时,我会得到:

/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 37963 instead
  warnings.warn(

因为之前的集群没有关闭。

有什么方法可以在创建LocalCluster过程中发生中断时自动关闭集群吗?

python dask dask-distributed
1个回答
0
投票

我非常确定这需要对分布式进行深入的代码更改才能实现。

with
的工作方式是,只有在
LocalCluster()
完成后才会设置上下文,因此,如果您在此之前中断,则还没有可以安全退出的上下文。

您可以尝试的一件事是

cluster = LocalCluster(n_workers=0, ...)
with cluster:
    cluster.scale(...)
    # cluster.wait_for_workers(...) # if you need it
    with Client(cluster) as client:
        compute(...)

现在慢线发生在上下文中。

© www.soinside.com 2019 - 2024. All rights reserved.