我正在测试气流,在实现 dag 时遇到以下错误。
ERROR [airflow.models.dagbag.DagBag] Failed to import: /Users/sbhardwa/airflow/dags/heart_desease/inference_pipeline.py
Traceback (most recent call last):
File "/Users/sbhardwa/airflow/airflow_venv/lib/python3.12/site-packages/airflow/models/dagbag.py", line 351, in parse
loader.exec_module(new_module)
File "<frozen importlib._bootstrap_external>", line 995, in exec_module
File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
File "/Users/sbhardwa/airflow/dags/heart_desease/inference_pipeline.py", line 57, in <module>
normalise_task.set_downstream(predict_data)
File "/Users/sbhardwa/airflow/airflow_venv/lib/python3.12/site-packages/airflow/models/taskmixin.py", line 262, in set_downstream
self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
File "/Users/sbhardwa/airflow/airflow_venv/lib/python3.12/site-packages/airflow/models/taskmixin.py", line 214, in _set_relatives
task_object.update_relative(self, not upstream, edge_modifier=edge_modifier)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'function' object has no attribute 'update_relative'
这是代码块-
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from heart_desease.utils import *
default_args = {
'owner': 'airflow',
'start_date': datetime(2023,7,30),
'retries' : 1,
'retry_delay' : timedelta(seconds=30)
}
ML_inference_dag = DAG(
dag_id = 'Heart_dag',
default_args = default_args,
description = 'Dag to run inferences on predictions of heart disease patients',
schedule_interval = '@hourly'
)
load_task = PythonOperator(
task_id = 'load_task',
python_callable = get_inference_data,
dag = ML_inference_dag)
######
#Define task for encoding the categorial variables here
######
encode_task = PythonOperator(
task_id = 'encode_task',
python_callable = encode_features,
dag = ML_inference_dag)
######
#Define task for normalising the variables here
######
normalise_task = PythonOperator(
task_id = 'normalise_task',
python_callable = normalize_data,
dag = ML_inference_dag)
######
#Define task for getting models prediction here
######
prediction_task = PythonOperator(
task_id = 'prediction_task',
python_callable = predict_data,
dag = ML_inference_dag)
load_task.set_downstream(encode_task)
encode_task.set_downstream(normalise_task)
normalise_task.set_downstream(predict_data)
这是气流细节 - 供应商信息 apache-airflow-providers-common-io | apache-airflow-providers-common-io | apache-airflow-providers-common-io 1.3.1 apache-airflow-providers-common-sql | apache-airflow-providers-common-sql | apache-airflow-providers-common-sql | 1.13.0 apache-airflow-providers-fab | apache-airflow-providers-fab | apache-airflow-providers-fab 1.1.0 apache-airflow-providers-ftp | apache-airflow-providers-ftp | apache-airflow-providers-ftp 3.9.0 apache-airflow-providers-http | apache-airflow-providers-http | 4.11.0 apache-airflow-providers-imap | apache-airflow-providers-imap | apache-airflow-providers-imap 3.6.0 apache-airflow-providers-smtp | apache-airflow-providers-smtp | apache-airflow-providers-smtp 1.7.0 apache-airflow-providers-sqlite | apache-airflow-providers-sqlite | apache-airflow-providers-sqlite 3.8.0
当我删除“set_downstream”行时,Dags 运行时不会出现错误。 我用谷歌搜索,但无法响应此错误
请指教
predict_data
是您分配给要运行的“prediction_task”的可调用函数。您应该引用 prediction_task
变量,而不是引用任务本身,而不是它的可调用运行:
normalise_task.set_downstream(prediction_task)