当某个气流任务失败时,我需要进行回滚操作。要知道要回滚的内容,我需要访问回滚函数内的任务参数。定义任务时,回滚函数将传递给
on_failure_callback
参数。
以此作为一个简化的示例:
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
def rollback(context: dict):
print("How do I access the 'task_argument' value?")
@task(on_failure_callback=rollback)
def example_task(task_argument: str) -> None:
assert False
@dag(
schedule_interval=None,
start_date=days_ago(1),
)
def example_dag() -> None:
example_task("the task argument's value.")
example_dag()
如何获取传递给
example_task
内的 on_failure_callback
的值?我确定它隐藏在 context
变量中,但我无法找到有关 context
内部内容的明确文档。 context
确实包含字段 params
但不包含 task_argument
。
这个代码片段对我有用.. 基本上,如果您使用 @task 装饰器,您需要在函数参数中指定上下文变量。 根据此文档页面 https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/taskflow.html#taskflow
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
def rollback(context: dict):
print("How do I access the 'task_argument' value?")
print(context.get('params'))
@task(on_failure_callback=rollback)
def example_task(params: dict) -> None:
assert False
@dag(
schedule_interval=None,
start_date=days_ago(1),
)
def example_dag() -> None:
example_task(params={'mytask_param' : "the task argument's value."})
example_dag()