我正在使用 DaskTaskRunner
使用
Prefect运行工作流程,它创建并保存一个
dask.distibuted.LocalCluster
实例。
在完美的任务中,我使用 dask_ml.RandomSearchCV
并安装它,根据我的理解,这应该使用 LocalCluster
来实现。
当拟合因任何原因失败时(即由
LocalCluster
提交给 RandomizedSearchCV
的 dask-task 失败),错误会被打印,但不会传播到外部 prefect 任务。配件无休止地运行(并失败)。
一个最小的例子
from distributed import Client
from prefect import task, flow
from prefect_dask import DaskTaskRunner
from dask_ml.model_selection import RandomizedSearchCV
from sklearn.ensemble import RandomForestRegressor
import pandas as pd
@task
def sample_task():
X = pd.DataFrame({"A": [1, 2, 3], "B": ["str", 1, 2], "C": [0, 0, 0]})
y = X["C"]
estimator = RandomizedSearchCV(
estimator=RandomForestRegressor(),
param_distributions={'max_depth': [3, 5, 7]},
)
estimator.fit(X, y)
@flow
def sample_flow():
sample_task.submit()
print("This should not be printed")
sample_flow.with_options(
task_runner=DaskTaskRunner(
cluster_class="dask.distributed.LocalCluster"
)
)()
正如预期的那样,由于
X
内部有字符串,拟合失败。然而,我希望完美的任务sample_task()
也会失败,但事实并非如此。
dask 任务的重试限制可能也可以做到这一点,但这些是由
RandomizedSearchCV
提交的。
有什么方法可以传播该错误,以便我的完美任务也失败?
dask_ml.model_selection.RandomizedSearchCV 会引发警告而不是错误,因此 Prefect 不会阻止 dask-ml 重试。
您可以在此处
查看dask-ml代码