集群上数据的Dask和持久化

问题描述 投票:0回答:1

我正在开发一个使用历史数据和传入数据进行分析的项目。我想了解如何管理 dask 上传入数据的更新,而不必每次都分派所有历史数据。

我收集时间序列数据进行分析,但时间序列随着传入数据而增长,并且每个流的传入数据需要发送到适当的工作人员以进行 ARMA 分析等操作。如果我对天气进行 ARMA 分析,我希望将气压与温度分开,并通过比较压力与压力和温度与温度来进行分析。我不想将新的温度数据附加到先前的温度数据并将现在更大的系列分派给新工作人员。我只想将新的温度数据发送给已经拥有所有先前温度数据的 dask 工作人员,依此类推。如何确保先前的温度数据保留在工作人员上,以及如何将新的温度数据(仅)发送给拥有先前数据的工作人员。

我已经用 dask 做了一些基本的事情,但是所有的基本课程都没有解决历史方法对结果的仅工人持久性的持久性。

此外,这些数据不是基于Dask系列或数据帧,而是基于保存与分析方法相关的不同数据和方法的类。所以我无法有效地使用 dask 系列或数据框。

如有任何帮助,我们将不胜感激

python python-3.x dask directed-acyclic-graphs dask-distributed
1个回答
2
投票

这可能不是正确的解决方案,但一种可能性是指定特定的工作人员来执行特定的计算。例如,让我们将工人分为两组:

# instantiate workers
from distributed import Client
c = Client(n_workers=5)

# here the separation is done based on order
# but custom logic can be implemented instead
workers_pressure = list(c.scheduler_info()['workers'])[3:]
workers_temperature = list(c.scheduler_info()['workers'])[:3]

现在,对于与

pressure
相关的任务,我们可以指定与
pressure
相关的工作人员:

data_pressure = [4,5,6]
data_temperature = [1,2,3]

# scatter data to pressure/temperature workers
d_p = client.scatter(data_pressure, workers= workers_pressure)
d_t = client.scatter(data_temperature, workers=workers_temperature)

# submit computations to specific workers
function_pressure = lambda x: x**2
function_temperature = lambda x: x**2

f_p = client.map(function_pressure, d_p, workers=workers_pressure)
f_t = client.map(function_temperature, d_t, workers= workers_temperature)

在上面的代码片段中,指定处理压力数据的工作人员将用于运行压力计算。

如果您有一组非常异构的任务,这将无法很好地扩展。如果这是您的情况,我会首先构建任务图 (DAG),然后让

dask
处理最有效的工作人员任务分配。

© www.soinside.com 2019 - 2024. All rights reserved.