在 on_failure_callback 函数中访问气流任务参数

问题描述 投票:0回答:1

当某个气流任务失败时,我需要进行回滚操作。要知道要回滚的内容,我需要访问回滚函数内的任务参数。定义任务时,回滚函数将传递给

on_failure_callback
参数。

以此作为一个简化的示例:

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago


def rollback(context: dict):
    print("How do I access the 'task_argument' value?")

@task(on_failure_callback=rollback)
def example_task(task_argument: str) -> None:
    assert False
    
@dag(
    schedule_interval=None,
    start_date=days_ago(1),
)
def example_dag() -> None:
    example_task("the task argument's value.")
    
example_dag()

如何获取传递给

example_task
内的
on_failure_callback
的值?我确定它隐藏在
context
变量中,但我无法找到有关
context
内部内容的明确文档。
context
确实包含字段
params
但不包含
task_argument

python airflow rollback
1个回答
0
投票

这个代码片段对我有用.. 基本上,如果您使用 @task 装饰器,您需要在函数参数中指定上下文变量。 根据此文档页面 https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/taskflow.html#taskflow

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago


def rollback(context: dict):
    print("How do I access the 'task_argument' value?")
    print(context.get('params'))

@task(on_failure_callback=rollback)
def example_task(params: dict) -> None:
    assert False
    
@dag(
    schedule_interval=None,
    start_date=days_ago(1),
)
def example_dag() -> None:
    example_task(params={'mytask_param' : "the task argument's value."})
    
example_dag()

© www.soinside.com 2019 - 2024. All rights reserved.