Dask以编程方式启动远程工作者

问题描述 投票:0回答:1

我需要以编程方式创建远程工作者并将其用于任务,然后将其关闭。

文档中给出的示例代码适用于所写内容:

import asyncio
from distributed import Worker, Scheduler, Client
from distributed.scheduler import WorkerState

s = "x.x.x.x:8786" # remote IP, not local, started from command line.

async def f():
    async with Worker(s) as w1, Worker(s) as w2:
        async with Client(s, asynchronous=True) as client:
            future = client.submit(lambda x: x + 1, 10)
            result = await future
            print(result)

asyncio.get_event_loop().run_until_complete(f())

假设我有n个不同于dask调度程序的计算机-ip1, ip2, ..ipn。现在,我面临着两个问题:

  1. 连接到远程调度程序后,我想在多台计算机上创建工作线程。假设ip1, ip2, ip3。在创建host时尝试使用contact_addressWorker参数。工作人员从Scheduler的本地本身开始,而不是所需的机器。如何在连接到同一调度程序的所需计算机中远程启动工作程序?
  2. 我需要在client函数中创建的async才能随时间用于多个submitmap调用。我也有很多自定义的python函数。因此,我如何以编程方式在不同的计算机上创建工作器,创建client并在异步功能之外逐步使用它。我尝试了以下操作,但未成功。

s_address = "x.x.x.x:8786" # remote scheduler IP

async def f():
    async with Worker(s_address) as w1, Worker(s_address) as w2:
        async with Client(s_address, asynchronous=True) as client:
            return client

client_to_use = f() # expecting client object which I can use and...
                    # ...when everything finishes, hoping context manager kills the workers.
                    # This clearly doesn't work
asyncio.get_event_loop().run_until_complete(f()) # not sure if this is valid anymore

# What I need to do
custom_module.call_some_fn_to_use_dask_client(client_to_use) # Does not work as well!! ```
python dask dask-distributed
1个回答
0
投票

您应该将可供使用的各种选项读入setup dask。简而言之,您需要一种方法来与要在其上启动工作程序的机器进行对话。调度程序不知道如何执行此操作,本地客户端也不知道如何执行此操作,您需要自己选择机制。它可以像登录到远程机器并启动工作进程(即运行python)一样简单,但是有一些更复杂的系统,例如超级计算机调度程序,yarn和kubernetes。

Worker的文档清楚表明您正在实例化here调用发生的位置。

在进一步介绍之前,您可能应该考虑要达到的目标,并进行整理……

© www.soinside.com 2019 - 2024. All rights reserved.