Dask 分布式 - 由工作方法共享的有状态全局参数

问题描述 投票:0回答:1

我正在使用 Dask 设置集群。现在我正在本地主机上设置调度程序和工作人员。

cluster = SSHCluster(["localhost", "localhost"],
                    connect_options={"known_hosts": None},
                    worker_options={"n_workers": params["n_workers"], },
                    scheduler_options={"port": 0, "dashboard_address": ":8797"},)

client = Client(cluster)

有没有办法创建有状态的全局参数,可以在工作人员端初始化并由随后分配在工作人员上计算的任何worker_methods使用?

我找到了 client.register_worker_plugin 方法。

class ReadOnlyData(WorkerPlugin):
    def __init__(self, jsonfilepath):
        with open(jsonfilepath, "r") as readfile:
            self.persons = jsonload(read_file)
            self.persons_len = len(self.persons)

def main():
    cluster = SSHCluster(params) # simplified
    client = Client(cluster)
    plugin = ReadOnlyData(jsonfilepath)
    client.register_worker_plugin(plugin, name="read-only-data")

但是,ReadOnlyData 是在客户端初始化的,因此 self.persons 和 self.persons_len 被复制到工作人员(而不是在工作人员一侧初始化)。虽然这对于小数据可能很有用,但如果数据集很大,这将产生额外的复制通信开销(除非我在概念上遗漏了一些东西)。

假设 ReadOnlyData 和“jsonfilepath”中的文件在工作人员端可用。我们可以从具有一些任意逻辑的“worker_method_1”和“worker_method_2”中调用它;然而,这意味着每次调用工人时都必须调用它。是否有一些“初始化”事件/方法发生在工作人员一侧,在工作人员创建之后和工作人员方法分配之前,这将允许我们预先加载一些数据结构作为有状态的全局参数,通常共享在工作方法的后续实例中?

python cluster-computing dask dask-distributed
1个回答
0
投票

有很多方法可以做到这种事情,但我建议最简单的也是最好的。

def read_only_data(jsonfilepath):
    with open(jsonfilepath, "r") as readfile:
        return jsonload(readfile)

future = client.submit(readonlydata, mypath)
client.replicate(future)

现在将将来时用在其他事情上。当 future 及其依赖项被释放时(例如,

del future
),worker 上的数据将被释放。

其他选项包括

client.run()
(强制对所有工作人员执行功能)、
distributed.Variable
以及工作人员插件。但请记住:如果您使用普通 dask(例如,延迟),则需要将数据存储在另一个工作线程上的任务,它将根据需要进行复制,而无需您执行任何操作。

© www.soinside.com 2019 - 2024. All rights reserved.