Airflow AttributeError:“function”对象没有属性“update_relative”错误

问题描述 投票:0回答:1

我正在测试气流,在实现 dag 时遇到以下错误。

ERROR [airflow.models.dagbag.DagBag] Failed to import: /Users/sbhardwa/airflow/dags/heart_desease/inference_pipeline.py
Traceback (most recent call last):
  File "/Users/sbhardwa/airflow/airflow_venv/lib/python3.12/site-packages/airflow/models/dagbag.py", line 351, in parse
    loader.exec_module(new_module)
  File "<frozen importlib._bootstrap_external>", line 995, in exec_module
  File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
  File "/Users/sbhardwa/airflow/dags/heart_desease/inference_pipeline.py", line 57, in <module>
    normalise_task.set_downstream(predict_data)
  File "/Users/sbhardwa/airflow/airflow_venv/lib/python3.12/site-packages/airflow/models/taskmixin.py", line 262, in set_downstream
    self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
  File "/Users/sbhardwa/airflow/airflow_venv/lib/python3.12/site-packages/airflow/models/taskmixin.py", line 214, in _set_relatives
    task_object.update_relative(self, not upstream, edge_modifier=edge_modifier)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'function' object has no attribute 'update_relative'

这是代码块-

from airflow import DAG
from airflow.operators.python import PythonOperator

from datetime import datetime, timedelta
from heart_desease.utils import *


default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023,7,30),
    'retries' : 1, 
    'retry_delay' : timedelta(seconds=30)
}


ML_inference_dag = DAG(
                dag_id = 'Heart_dag',
                default_args = default_args,
                description = 'Dag to run inferences on predictions of heart disease patients',
                schedule_interval = '@hourly'
)

load_task = PythonOperator(
            task_id = 'load_task',
            python_callable = get_inference_data,
            dag = ML_inference_dag)

######
#Define task for encoding the categorial variables here
######

encode_task = PythonOperator(
            task_id = 'encode_task',
            python_callable = encode_features,
            dag = ML_inference_dag)

######
#Define task for normalising the variables here
######

normalise_task = PythonOperator(
            task_id = 'normalise_task',
            python_callable = normalize_data,
            dag = ML_inference_dag)

######
#Define task for getting models prediction here
######

prediction_task = PythonOperator(
            task_id = 'prediction_task',
            python_callable = predict_data,
            dag = ML_inference_dag)

load_task.set_downstream(encode_task)
encode_task.set_downstream(normalise_task)
normalise_task.set_downstream(predict_data)

这是气流细节 - 供应商信息 apache-airflow-providers-common-io | apache-airflow-providers-common-io | apache-airflow-providers-common-io 1.3.1 apache-airflow-providers-common-sql | apache-airflow-providers-common-sql | apache-airflow-providers-common-sql | 1.13.0 apache-airflow-providers-fab | apache-airflow-providers-fab | apache-airflow-providers-fab 1.1.0 apache-airflow-providers-ftp | apache-airflow-providers-ftp | apache-airflow-providers-ftp 3.9.0 apache-airflow-providers-http | apache-airflow-providers-http | 4.11.0 apache-airflow-providers-imap | apache-airflow-providers-imap | apache-airflow-providers-imap 3.6.0 apache-airflow-providers-smtp | apache-airflow-providers-smtp | apache-airflow-providers-smtp 1.7.0 apache-airflow-providers-sqlite | apache-airflow-providers-sqlite | apache-airflow-providers-sqlite 3.8.0

当我删除“set_downstream”行时,Dags 运行时不会出现错误。 我用谷歌搜索,但无法响应此错误

请指教

airflow
1个回答
0
投票

predict_data
是您分配给要运行的“prediction_task”的可调用函数。您应该引用
prediction_task
变量,而不是引用任务本身,而不是它的可调用运行:

normalise_task.set_downstream(prediction_task)
© www.soinside.com 2019 - 2024. All rights reserved.