dag中的气流任务状态

问题描述 投票:11回答:4

我需要任务的状态,例如它是否在同一dag中正在运行,正在升级或失败。所以我尝试使用下面的代码来获取它,尽管我没有输出...

Auto = PythonOperator(
    task_id='test_sleep',
    python_callable=execute_on_emr,
    op_kwargs={'cmd':'python /home/hadoop/test/testsleep.py'},
    dag=dag)

logger.info(Auto)

目的是在完成特定的气流任务后杀死某些正在运行的任务。

问题是我如何获得任务的状态,例如它处于运行状态还是失败或成功

python airflow apache-airflow
4个回答
12
投票

我正在做类似的事情。我需要检查一项任务是否成功完成了另一项任务的前10次。taky2使我走上了正确的道路。实际上很简单:

from airflow.models import TaskInstance
ti = TaskInstance(*your_task*, execution_date)
state = ti.current_state()

由于我要检查dag中的内容,因此不必指定dag。我只是创建了一个函数来遍历过去的n_days并检查状态。

def check_status(**kwargs):
    last_n_days = 10
    for n in range(0,last_n_days):
        date = kwargs['execution_date']- timedelta(n)
        ti = TaskInstance(*my_task*, date) #my_task is the task you defined within the DAG rather than the task_id (as in the example below: check_success_task rather than 'check_success_days_before') 
        state = ti.current_state()
        if state != 'success':
            raise ValueError('Not all previous tasks successfully completed.')

当调用该函数时,请确保设置provide_context。

check_success_task = PythonOperator(
    task_id='check_success_days_before',
    python_callable= check_status,
    provide_context=True,
    dag=dag
)

更新:当您想从另一个dag调用任务时,您需要这样调用它:

from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance

dag_folder = conf.get('core','DAGS_FOLDER')
dagbag = DagBag(dag_folder)
check_dag = dagbag.dags[*my_dag_id*]
my_task = check_dag.get_task(*my_task_id*)
ti = TaskInstance(my_task, date)

显然,现在做同样的事情也有一个api调用:

from airflow.api.common.experimental.get_task_instance import get_task_instance
ti = get_task_instance(*my_dag_id*, *my_task_id*, date)

6
投票

好吧,我想我知道您在做什么,我并不完全同意,但是我会从一个答案开始。

一种简单但不可靠的方法是查询task_instance表。我在postgres,但是结构应该相同。首先通过数据库调用获取您感兴趣的task_ids和任务状态。

SELECT task_id, state
FROM task_instance
WHERE dag_id = '<dag_id_attrib>'
  AND execution_date = '<execution_date_attrib>'
  AND task_id = '<task_to_check>'

这应该为您提供您要监视的任务的状态(和名称,以供参考)。状态存储为简单的小写字符串。


3
投票

看看Priyank建议的负责命令行界面操作的代码。

https://github.com/apache/incubator-airflow/blob/2318cea74d4f71fba353eaca9bb3c4fd3cdb06c0/airflow/bin/cli.py#L581

def task_state(args):
    dag = get_dag(args)
    task = dag.get_task(task_id=args.task_id)
    ti = TaskInstance(task, args.execution_date)
    print(ti.current_state())

因此,看来您应该可以使用类似的代码轻松地在DAG代码库中完成此操作。

或者,您可以使用python的subprocess库从代码中执行这些CLI操作。


2
投票

您可以为此使用命令行界面:

 airflow task_state [-h] [-sd SUBDIR] dag_id task_id execution_date

有关此内容的更多信息,请参考官方气流文档:

http://airflow.incubator.apache.org/cli.html

© www.soinside.com 2019 - 2024. All rights reserved.