我正在使用 Dask 设置集群。现在我正在本地主机上设置调度程序和工作人员。
cluster = SSHCluster(["localhost", "localhost"],
connect_options={"known_hosts": None},
worker_options={"n_workers": params["n_workers"], },
scheduler_options={"port": 0, "dashboard_address": ":8797"},)
client = Client(cluster)
有没有办法创建有状态的全局参数,可以在工作人员端初始化并由随后分配在工作人员上计算的任何worker_methods使用?
我找到了 client.register_worker_plugin 方法。
class ReadOnlyData(WorkerPlugin):
def __init__(self, jsonfilepath):
with open(jsonfilepath, "r") as readfile:
self.persons = jsonload(read_file)
self.persons_len = len(self.persons)
def main():
cluster = SSHCluster(params) # simplified
client = Client(cluster)
plugin = ReadOnlyData(jsonfilepath)
client.register_worker_plugin(plugin, name="read-only-data")
但是,ReadOnlyData 是在客户端初始化的,因此 self.persons 和 self.persons_len 被复制到工作人员(而不是在工作人员一侧初始化)。虽然这对于小数据可能很有用,但如果数据集很大,这将产生额外的复制通信开销(除非我在概念上遗漏了一些东西)。
假设 ReadOnlyData 和“jsonfilepath”中的文件在工作人员端可用。我们可以从具有一些任意逻辑的“worker_method_1”和“worker_method_2”中调用它;然而,这意味着每次调用工人时都必须调用它。是否有一些“初始化”事件/方法发生在工作人员一侧,在工作人员创建之后和工作人员方法分配之前,这将允许我们预先加载一些数据结构作为有状态的全局参数,通常共享在工作方法的后续实例中?
有很多方法可以做到这种事情,但我建议最简单的也是最好的。
def read_only_data(jsonfilepath):
with open(jsonfilepath, "r") as readfile:
return jsonload(readfile)
future = client.submit(readonlydata, mypath)
client.replicate(future)
现在将将来时用在其他事情上。当 future 及其依赖项被释放时(例如,
del future
),worker 上的数据将被释放。
其他选项包括
client.run()
(强制对所有工作人员执行功能)、distributed.Variable
以及工作人员插件。但请记住:如果您使用普通 dask(例如,延迟),则需要将数据存储在另一个工作线程上的任务,它将根据需要进行复制,而无需您执行任何操作。