延迟延迟错误-AttributeError:'_ thread._local'对象没有属性'value'

问题描述 投票:1回答:1

我一直在绞尽脑汁想弄清楚为什么我无法在Dask上执行此可并行化的功能。本质上,我有一个函数可以加载keras模型(我使用mlflow存储模型),然后对要批量发送的某些输入数据使用模型的预测方法。这段代码(如下)导致以下错误:

AttributeError: '_thread._local' object has no attribute 'value'

代码示例:

@delayed
def load_and_predict(input_data_chunk):

    def contrastive_loss(y_true, y_pred):
            margin = 1
            square_pred = K.square(y_pred)
            margin_square = K.square(K.maximum(margin - y_pred, 0))
            return K.mean(y_true * square_pred + (1 - y_true) * margin_square)

    mlflow.set_tracking_uri('<tracking_uri>')
    mlflow.set_experiment('experiment_name')
    runs = mlflow.search_runs()
    artifact_uri = runs.loc[runs['start_time'].idxmax()]['artifact_uri']
    model = mlflow.keras.load_model(artifact_uri + '/model', custom_objects={'contrastive_loss': contrastive_loss})
    y_pred = model.predict(input_data_chunk)
    return y_pred

with Client(<scheduler_ip:port>) as client:
    batch_array = np.array_split(X_test, 10)
    results = []
    for batch in batch_array:
        prediction = load_and_predict(batch)
        results.append(prediction)

compute(*results)

我对Dask来说还很陌生,因此,对此问题的任何专家指导将不胜感激。

python keras dask dask-distributed dask-delayed
1个回答
1
投票

如果要使用在上下文中创建的Client进行计算,则compute()行也必须在上下文中:请缩进它。

© www.soinside.com 2019 - 2024. All rights reserved.