我需要以编程方式创建远程工作者并将其用于任务,然后将其关闭。
文档中给出的示例代码适用于所写内容:
import asyncio
from distributed import Worker, Scheduler, Client
from distributed.scheduler import WorkerState
s = "x.x.x.x:8786" # remote IP, not local, started from command line.
async def f():
async with Worker(s) as w1, Worker(s) as w2:
async with Client(s, asynchronous=True) as client:
future = client.submit(lambda x: x + 1, 10)
result = await future
print(result)
asyncio.get_event_loop().run_until_complete(f())
假设我有n
个不同于dask调度程序的计算机-ip1, ip2, ..ipn
。现在,我面临着两个问题:
ip1, ip2, ip3
。在创建host
时尝试使用contact_address
和Worker
参数。工作人员从Scheduler的本地本身开始,而不是所需的机器。如何在连接到同一调度程序的所需计算机中远程启动工作程序?client
函数中创建的async
才能随时间用于多个submit
,map
调用。我也有很多自定义的python函数。因此,我如何以编程方式在不同的计算机上创建工作器,创建client
并在异步功能之外逐步使用它。我尝试了以下操作,但未成功。
s_address = "x.x.x.x:8786" # remote scheduler IP
async def f():
async with Worker(s_address) as w1, Worker(s_address) as w2:
async with Client(s_address, asynchronous=True) as client:
return client
client_to_use = f() # expecting client object which I can use and...
# ...when everything finishes, hoping context manager kills the workers.
# This clearly doesn't work
asyncio.get_event_loop().run_until_complete(f()) # not sure if this is valid anymore
# What I need to do
custom_module.call_some_fn_to_use_dask_client(client_to_use) # Does not work as well!! ```
您应该将可供使用的各种选项读入setup dask。简而言之,您需要一种方法来与要在其上启动工作程序的机器进行对话。调度程序不知道如何执行此操作,本地客户端也不知道如何执行此操作,您需要自己选择机制。它可以像登录到远程机器并启动工作进程(即运行python)一样简单,但是有一些更复杂的系统,例如超级计算机调度程序,yarn和kubernetes。
Worker
的文档清楚表明您正在实例化here调用发生的位置。
在进一步介绍之前,您可能应该考虑要达到的目标,并进行整理……