为什么我的简单 HttpOperator 使我的本地气流调度程序崩溃?

问题描述 投票:0回答:1

我在 macOS 上安装了airflow,与 sqlite db 一起独立运行,我只是在做简单的测试。 我可以成功运行 bashoperators,但由于某种原因,每次我尝试这个 HttpOperator 时,我的调度程序都会崩溃。我删除所有气流进程并使用“气流独立”重新启动,但相同的行为重复。我不知道如何解决这个问题。 难道真的是因为Sqlite/SequentialExecutor的限制,没有办法吗?可能与 https 运算符有关?

端点有效:https://gorest.co.in/public/v2/posts

连接正确设置:test_api http https://gorest.co.in/public/v2 False False

apache-airflow==2.5.1

谢谢您的帮助

PycharmProjects/composer/logs/dag_id=test_api/run_id=scheduled__2024-01-30T09:00:00+00:00/task_id=is_api_active/attempt=4.log
[2024-01-31, 11:40:28 UTC] {taskinstance.py:1083} INFO - Dependencies all met for <TaskInstance: test_api.is_api_active scheduled__2024-01-30T09:00:00+00:00 [queued]>
[2024-01-31, 11:40:28 UTC] {taskinstance.py:1083} INFO - Dependencies all met for <TaskInstance: test_api.is_api_active scheduled__2024-01-30T09:00:00+00:00 [queued]>
[2024-01-31, 11:40:28 UTC] {taskinstance.py:1279} INFO - 
--------------------------------------------------------------------------------
[2024-01-31, 11:40:28 UTC] {taskinstance.py:1280} INFO - Starting attempt 4 of 4
[2024-01-31, 11:40:28 UTC] {taskinstance.py:1281} INFO - 
--------------------------------------------------------------------------------
[2024-01-31, 11:40:28 UTC] {taskinstance.py:1300} INFO - Executing <Task(HttpSensor): is_api_active> on 2024-01-30 09:00:00+00:00
[2024-01-31, 11:40:28 UTC] {standard_task_runner.py:55} INFO - Started process 44845 to run task
[2024-01-31, 11:40:28 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_api', 'is_api_active', 'scheduled__2024-01-30T09:00:00+00:00', '--job-id', '29', '--raw', '--subdir', 'DAGS_FOLDER/test_api.py', '--cfg-path', '/var/folders/84/5q_1yj555494bdjdy7m78pv40000gp/T/tmp6pg6_8t6']
[2024-01-31, 11:40:28 UTC] {standard_task_runner.py:83} INFO - Job 29: Subtask is_api_active
[2024-01-31, 11:40:28 UTC] {task_command.py:388} INFO - Running <TaskInstance: test_api.is_api_active scheduled__2024-01-30T09:00:00+00:00 [running]> on host pc-156.home
[2024-01-31, 11:40:28 UTC] {taskinstance.py:1507} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=xxxxxx
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_api
AIRFLOW_CTX_TASK_ID=is_api_active
AIRFLOW_CTX_EXECUTION_DATE=2024-01-30T09:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=4
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2024-01-30T09:00:00+00:00
[2024-01-31, 11:40:28 UTC] {http.py:122} INFO - Poking: posts/
[2024-01-31, 11:40:28 UTC] {base.py:73} INFO - Using connection ID 'test_api' for task execution.

达格代码:

import json

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

from airflow.providers.http.sensors.http import HttpSensor
from airflow.utils.dates import days_ago
from airflow.providers.http.operators.http import SimpleHttpOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': True,
    'retries': 0,
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

with DAG(
    'test_api',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval="0 9 * * *",
    start_date=days_ago(2),
    catchup=False,
    tags=['example'],

) as dag:

    def debug(response):
        print(response)
        if "200" in response:
            return True
        return False

    task_is_api_active = HttpSensor(
        task_id="is_api_active",
        http_conn_id='test_api',
        endpoint='posts/'
    )
https airflow
1个回答
0
投票

我认为需要纠正的一些事情:

  1. http connection_id test_api 不以 / 结尾。所以你的请求将发送到 ...v2posts/ 而不是 v2/posts

  2. 如果您的端点是https,请添加

    extra_options = {'验证':False},

  3. 您还可以在 HttpSensor 调用中使用 response_check=debug 来检查响应

  4. 在按计划制作此 DAG 之前,我建议尝试使用 Scheduled_interval=None 并查看它是否运行

© www.soinside.com 2019 - 2024. All rights reserved.