我在 macOS 上安装了airflow,与 sqlite db 一起独立运行,我只是在做简单的测试。 我可以成功运行 bashoperators,但由于某种原因,每次我尝试这个 HttpOperator 时,我的调度程序都会崩溃。我删除所有气流进程并使用“气流独立”重新启动,但相同的行为重复。我不知道如何解决这个问题。 难道真的是因为Sqlite/SequentialExecutor的限制,没有办法吗?可能与 https 运算符有关?
端点有效:https://gorest.co.in/public/v2/posts
连接正确设置:test_api http https://gorest.co.in/public/v2 False False
apache-airflow==2.5.1
谢谢您的帮助
PycharmProjects/composer/logs/dag_id=test_api/run_id=scheduled__2024-01-30T09:00:00+00:00/task_id=is_api_active/attempt=4.log
[2024-01-31, 11:40:28 UTC] {taskinstance.py:1083} INFO - Dependencies all met for <TaskInstance: test_api.is_api_active scheduled__2024-01-30T09:00:00+00:00 [queued]>
[2024-01-31, 11:40:28 UTC] {taskinstance.py:1083} INFO - Dependencies all met for <TaskInstance: test_api.is_api_active scheduled__2024-01-30T09:00:00+00:00 [queued]>
[2024-01-31, 11:40:28 UTC] {taskinstance.py:1279} INFO -
--------------------------------------------------------------------------------
[2024-01-31, 11:40:28 UTC] {taskinstance.py:1280} INFO - Starting attempt 4 of 4
[2024-01-31, 11:40:28 UTC] {taskinstance.py:1281} INFO -
--------------------------------------------------------------------------------
[2024-01-31, 11:40:28 UTC] {taskinstance.py:1300} INFO - Executing <Task(HttpSensor): is_api_active> on 2024-01-30 09:00:00+00:00
[2024-01-31, 11:40:28 UTC] {standard_task_runner.py:55} INFO - Started process 44845 to run task
[2024-01-31, 11:40:28 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_api', 'is_api_active', 'scheduled__2024-01-30T09:00:00+00:00', '--job-id', '29', '--raw', '--subdir', 'DAGS_FOLDER/test_api.py', '--cfg-path', '/var/folders/84/5q_1yj555494bdjdy7m78pv40000gp/T/tmp6pg6_8t6']
[2024-01-31, 11:40:28 UTC] {standard_task_runner.py:83} INFO - Job 29: Subtask is_api_active
[2024-01-31, 11:40:28 UTC] {task_command.py:388} INFO - Running <TaskInstance: test_api.is_api_active scheduled__2024-01-30T09:00:00+00:00 [running]> on host pc-156.home
[2024-01-31, 11:40:28 UTC] {taskinstance.py:1507} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=xxxxxx
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_api
AIRFLOW_CTX_TASK_ID=is_api_active
AIRFLOW_CTX_EXECUTION_DATE=2024-01-30T09:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=4
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2024-01-30T09:00:00+00:00
[2024-01-31, 11:40:28 UTC] {http.py:122} INFO - Poking: posts/
[2024-01-31, 11:40:28 UTC] {base.py:73} INFO - Using connection ID 'test_api' for task execution.
达格代码:
import json
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.http.sensors.http import HttpSensor
from airflow.utils.dates import days_ago
from airflow.providers.http.operators.http import SimpleHttpOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': True,
'retries': 0,
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
with DAG(
'test_api',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval="0 9 * * *",
start_date=days_ago(2),
catchup=False,
tags=['example'],
) as dag:
def debug(response):
print(response)
if "200" in response:
return True
return False
task_is_api_active = HttpSensor(
task_id="is_api_active",
http_conn_id='test_api',
endpoint='posts/'
)
我认为需要纠正的一些事情:
http connection_id test_api 不以 / 结尾。所以你的请求将发送到 ...v2posts/ 而不是 v2/posts
如果您的端点是https,请添加
extra_options = {'验证':False},
您还可以在 HttpSensor 调用中使用 response_check=debug 来检查响应
在按计划制作此 DAG 之前,我建议尝试使用 Scheduled_interval=None 并查看它是否运行