MWAA Airflow“执行器报告任务实例<TaskInstance: ... [queued]>已完成(成功),尽管任务说已排队。”导致任务失败

问题描述 投票:0回答:1

使用 Celery、多个调度程序运行 AWS Airflow Managed (2.2.2) 服务,并注意到许多使用传感器(基于时间的重新安排传感器)的任务似乎被标记为

failed
,尽管其中一些任务会继续运行直至完成.

传感器操作员是相当简单的 API 检查器,每 5 分钟轮询一次 API,直到收到特定响应。

我们编写了一个

on_failure_handler
,可以捕获所有这些发生的情况并发送松弛通知。

通常在 CloudWatch 中看到以下日志序列(由于某种原因没有可用的工作日志):

{
    "@timestamp": "2023-01-19 10:57:39.371",
    "@message": "[\u001b[34m2023-01-19 10:57:38,924\u001b[0m] {{\u001b[34mscheduler_job.py:\u001b[0m563}} INFO\u001b[0m - TaskInstance Finished: dag_id=custom_dag, task_id=task_xyz, run_id=run_123, run_start_date=2023-01-19 10:50:17.473702+00:00, run_end_date=2023-01-19 10:57:31.030716+00:00, run_duration=433.557014, state=failed, executor_state=success, try_number=1, max_tries=0, job_id=5942262, pool=default_pool, queue=airflow-celery-f742a6b6-c8cc-4a29-81a0-eb81cc44e889, priority_weight=7, operator=CustomSensorOperator\u001b[0m\n",
    "@logStream": "scheduler_console_ip-10-1-2-43.ec2.internal_1673666911.1179407.log"
},
{
    "@timestamp": "2023-01-19 10:57:39.122",
    "@message": "[\u001b[34m2023-01-19 10:57:38,863\u001b[0m] {{\u001b[34mscheduler_job.py:\u001b[0m510}} INFO\u001b[0m - Executor reports execution of custom_dag.task_xyz run_id=run_123 exited with status success for try_number 1\u001b[0m\n",
    "@logStream": "scheduler_console_ip-10-1-2-43.ec2.internal_1673666911.1179407.log"
},
{
    "@timestamp": "2023-01-19 10:57:32.718",
    "@message": "[2023-01-19 10:57:32,718] {{taskinstance.py:1029}} INFO - Dependencies not met for <TaskInstance: custom_dag.task_xyz run_123 [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is not a valid state for execution. The task must be cleared in order to be run.",
    "@logStream": "custom_dag/task_xyz/2023-01-19T07_41_27+00_00/1.log"
},
{
    # THIS CAUSES TASK INSTANCE TO FAIL EVEN THROUGH ITS STILL RUNNING

    "@timestamp": "2023-01-19 10:57:31.730",
    "@message": "[\u001b[34m2023-01-19 10:57:30,661\u001b[0m] {{\u001b[34mscheduler_job.py:\u001b[0m572}} ERROR\u001b[0m - Executor reports task instance <TaskInstance: custom_dag.task_xyz run_123 [queued]> finished (success) although the task says its queued. (Info: None) Was the task killed externally?\u001b[0m\n",
    "@logStream": "scheduler_console_ip-10-1-3-8.ec2.internal_1674030934.1519198.log"
},
{
    "@timestamp": "2023-01-19 10:57:31.719",
    "@message": "[\u001b[34m2023-01-19 10:57:30,661\u001b[0m] {{\u001b[34mscheduler_job.py:\u001b[0m563}} INFO\u001b[0m - TaskInstance Finished: dag_id=custom_dag, task_id=task_xyz, run_id=run_123, run_start_date=2023-01-19 10:50:17.473702+00:00, run_end_date=2023-01-19 10:50:18.224642+00:00, run_duration=0.75094, state=queued, executor_state=success, try_number=1, max_tries=0, job_id=5942262, pool=default_pool, queue=airflow-celery-f742a6b6-c8cc-4a29-81a0-eb81cc44e889, priority_weight=7, operator=CustomSensorOperator\u001b[0m\n",
    "@logStream": "scheduler_console_ip-10-1-3-8.ec2.internal_1674030934.1519198.log"
},
{
    "@timestamp": "2023-01-19 10:57:30.740",
    "@message": "[\u001b[34m2023-01-19 10:57:30,580\u001b[0m] {{\u001b[34mscheduler_job.py:\u001b[0m510}} INFO\u001b[0m - Executor reports execution of custom_dag.task_xyz run_id=run_123 exited with status success for try_number 1\u001b[0m\n",
    "@logStream": "scheduler_console_ip-10-1-3-8.ec2.internal_1674030934.1519198.log"
},
{
    "@timestamp": "2023-01-19 10:57:27.854",
    "@message": "[\u001b[34m2023-01-19 10:57:27,612\u001b[0m] {{\u001b[34mscheduler_job.py:\u001b[0m538}} INFO\u001b[0m - Setting external_id for <TaskInstance: custom_dag.task_xyz run_123 [queued]> to c244ff79-0e92-4c3d-8c89-0440af154848\u001b[0m\n",
    "@logStream": "scheduler_console_ip-10-1-2-43.ec2.internal_1673666911.1179407.log"
},
{
    "@timestamp": "2023-01-19 10:57:27.544",
    "@message": "[\u001b[34m2023-01-19 10:57:27,465\u001b[0m] {{\u001b[34mscheduler_job.py:\u001b[0m510}} INFO\u001b[0m - Executor reports execution of custom_dag.task_xyz run_id=run_123 exited with status queued for try_number 1\u001b[0m\n",
    "@logStream": "scheduler_console_ip-10-1-2-43.ec2.internal_1673666911.1179407.log"
},
{

    # THIS IS THE SUSPECIOUS SETTING OF THE TASK TO STATE SUCCESS

    "@timestamp": "2023-01-19 10:57:26.361",
    "@message": "\t<TaskInstance: custom_dag.task_xyz run_123 [queued]> in state SUCCESS\n",
    "@logStream": "scheduler_console_ip-10-1-3-8.ec2.internal_1674030934.1519198.log"
},
{
    "@timestamp": "2023-01-19 10:57:26.054",
    "@message": "[\u001b[34m2023-01-19 10:57:25,349\u001b[0m] {{\u001b[34mbase_executor.py:\u001b[0m82}} INFO\u001b[0m - Adding to queue: ['airflow', 'tasks', 'run', 'custom_dag', 'task_xyz', 'run_123', '--local', '--subdir', 'DAGS_FOLDER/custom_dag.py']\u001b[0m\n",
    "@logStream": "scheduler_console_ip-10-1-2-43.ec2.internal_1673666911.1179407.log"
},
{
    "@timestamp": "2023-01-19 10:57:26.042",
    "@message": "[\u001b[34m2023-01-19 10:57:25,349\u001b[0m] {{\u001b[34mscheduler_job.py:\u001b[0m450}} INFO\u001b[0m - Sending TaskInstanceKey(dag_id='custom_dag', task_id='task_xyz', run_id='run_123', try_number=1) to executor with priority 7 and queue airflow-celery-f742a6b6-c8cc-4a29-81a0-eb81cc44e889\u001b[0m\n",
    "@logStream": "scheduler_console_ip-10-1-2-43.ec2.internal_1673666911.1179407.log"
},
{
    "@timestamp": "2023-01-19 10:57:25.796",
    "@message": "\t<TaskInstance: custom_dag.task_xyz run_123 [scheduled]>\n",
    "@logStream": "scheduler_console_ip-10-1-2-43.ec2.internal_1673666911.1179407.log"
},
{
    "@timestamp": "2023-01-19 10:57:25.542",
    "@message": "[2023-01-19 10:57:25,516: INFO/ForkPoolWorker-3] Executing command in Celery: ['airflow', 'tasks', 'run', 'custom_dag', 'task_xyz', 'run_123', '--local', '--subdir', 'DAGS_FOLDER/custom_dag.py']\n",
    "@logStream": "worker_console_ip-10-1-3-191.ec2.internal_1674110783.438406.log"
},
{
    "@timestamp": "2023-01-19 10:57:25.425",
    "@message": "\t<TaskInstance: custom_dag.task_xyz run_123 [scheduled]>\n",
    "@logStream": "scheduler_console_ip-10-1-2-43.ec2.internal_1673666911.1179407.log"
},

有几次任务设置为错误,处理程序被触发,但调度程序继续检查任务实例,直到它实际成功完成,但失败状态保留在数据库中。

我怀疑将任务标记为

[queued] in state SUCCESS
的一条调度程序消息是导致错误的原因,但我无法确定发生这种情况的原因或根本原因。所有任务都是通过 API 手动触发的,并且没有安排 - 所以我认为提出的酸洗和回填解决方案不会有帮助。

我不完全确定这是否是调度程序间隔问题;但如果是的话,可能是什么原因造成的?

基于 logStream,似乎两个不同的计划正在竞争设置任务状态。

检查日志以尝试找到根本原因,但没有任何迹象。尚未重新启动调度程序,因为 MWAA 需要关闭所有气流 20-30 分钟,这目前并不理想。

airflow task instance mwaa
1个回答
0
投票

我遇到了一些类似的错误,例如“执行器报告任务实例已完成(失败),尽管任务说已排队。”希望我下面的回答对您有所帮助,或者至少是解决您问题的一个很好的起点。

在 MWAA(AWS 托管的 Airflow)中,我们通过在 MWAA 环境中添加 Airflow 配置选项来解决我们的问题,其中“celery.worker_autoscale”的配置选项设置为“5,5”(请注意,我们有一个 mw1.medium最大并发任务数为 10,我们的气流版本是 2.2.2,并且我们有大约 20 多个“作业”并行运行,每个作业有 6 个任务)。我们找到了如何解决这个埋藏在这个 git 问题

中的问题

配置“celery.worker_autoscale”是启动工作程序时将使用的最大和最小并发度。本质上我认为正在发生的事情是,Celery 在一个工作进程上运行了最大数量的进程(传感器、任务等),而另一个工作进程正在运行 0,此时它会发送错误,我认为这可以更好地分配celery 必须在其工作进程上运行的进程,这有助于记录进程的每个状态。此日志记录在 Airflow 中用于识别任务之间的状态转换。

有关 Airflow 和 MWAA 配置的更多信息,请参阅 this Medium post

© www.soinside.com 2019 - 2024. All rights reserved.