我正在遵循此链接中提到的教程 - download_rocket_launches.py 。当我在 Cloud Composer 中运行此程序时,我想放入本机路径,即 /home/airflow/gcs/dags,但它失败并出现错误路径未找到。
我可以给出什么路径来让这个命令起作用?这是我正在尝试执行的任务 -
download_launches = BashOperator(
task_id="download_launches",
bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'", # noqa: E501
dag=dag,
)
这对我有用:
import json
import pathlib
import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="download_rocket_launches",
description="Download rocket pictures of recently launched rockets.",
start_date=airflow.utils.dates.days_ago(14),
schedule_interval="@daily",
)
download_launches = BashOperator(
task_id="download_launches",
bash_command="curl -o /home/airflow/gcs/data/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming' ", # put space in between single quote and double quote
dag=dag,
)
download_launches
输出:
关键是在 bash 命令末尾的单引号
'
和双引号 "
之间添加空格。
Data
文件夹,如 GCP 文档中所述:
gs://bucket-name/data /home/airflow/gcs/data:存储任务产生和使用的数据。该文件夹安装在所有工作节点上。
我也在遵循相同的教程,你能提供气流 DAG 的正确代码吗 这是我的: 问题: 第一个任务成功运行,但在第二个任务中错误显示如下,我已经检查过将编码类型更改为 UTF-16、UTF-32,但没有用。
从 None 引发 JSONDecodeError("Expecting value", s, err.value)
我的气流 DAG:
import json
import pathlib
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import timedelta, datetime
dag = DAG(
dag_id = "download_rocket_launches",
start_date = datetime(2023,1,8),
schedule_interval = None
)
download_launches = BashOperator(
task_id = "download_launches",
bash_command = "curl -o /tmp/launches.json 'https://ll.thespacedevs.com/2.2.0/launch/upcoming' ",
dag = dag,
)
def _get_pictures():
pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok = True)
with open("/tmp/launches.json", encoding='UTF-8') as f:
launches = json.load(f)
image_urls = [launch["image"] for launch in launches["results"]]
for image_url in image_urls:
try:
response = requests.get(image_url)
image_filename = image_url.split("/")[-1]
target_file = f"/tmp/images/{image_filename}"
with open(target_file, "wb") as f:
f.write(response.content)
print(f"Downloaded {image_url} to {target_file} ")
except requests_exceptions.MissingSchema:
print(f"{image_url} appears to be an invalid URL.")
except requests_exceptions.ConnectionError:
print(f"Could not connect to {image_url}.")
get_pictures = PythonOperator(
task_id = "get_pictures",
python_callable = _get_pictures,
dag= dag,
)
notify = BashOperator(
task_id = "notify",
bash_command = 'echo "There are now $(ls /tmp/images/ | wc -1) images. " ',
dag= dag,
)
download_launches>>get_pictures>>notify