Cloud Composer 中的curl 命令 BashOpertor

问题描述 投票:0回答:2

我正在遵循此链接中提到的教程 - download_rocket_launches.py 。当我在 Cloud Composer 中运行此程序时,我想放入本机路径,即 /home/airflow/gcs/dags,但它失败并出现错误路径未找到。

我可以给出什么路径来让这个命令起作用?这是我正在尝试执行的任务 -

download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'",  # noqa: E501
    dag=dag,
)
bash curl google-cloud-composer
2个回答
0
投票

这对我有用:

import json
import pathlib

import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="download_rocket_launches",
    description="Download rocket pictures of recently launched rockets.",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval="@daily",
)

download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o  /home/airflow/gcs/data/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming' ",  # put space in between single quote and double quote 
    dag=dag,
)


download_launches 

输出:

关键是在 bash 命令末尾的单引号

'
和双引号
"
之间添加空格。

此外,建议在映射输出文件时使用

Data
文件夹,如 GCP 文档中所述:

gs://bucket-name/data /home/airflow/gcs/data:存储任务产生和使用的数据。该文件夹安装在所有工作节点上。


0
投票

我也在遵循相同的教程,你能提供气流 DAG 的正确代码吗 这是我的: 问题: 第一个任务成功运行,但在第二个任务中错误显示如下,我已经检查过将编码类型更改为 UTF-16、UTF-32,但没有用。

从 None 引发 JSONDecodeError("Expecting value", s, err.value)

我的气流 DAG:

import json
import pathlib
import requests

import requests.exceptions as requests_exceptions
from airflow import DAG 
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import timedelta, datetime

dag = DAG(
    dag_id = "download_rocket_launches",
    start_date = datetime(2023,1,8),
    schedule_interval = None
)

download_launches = BashOperator(
    task_id = "download_launches",
    bash_command = "curl -o /tmp/launches.json 'https://ll.thespacedevs.com/2.2.0/launch/upcoming' ",
    dag = dag,
)

def _get_pictures():
    pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok = True)
    with open("/tmp/launches.json", encoding='UTF-8') as f:
        launches = json.load(f)
        image_urls = [launch["image"] for launch in launches["results"]]
        for image_url in image_urls:
            try:
                response = requests.get(image_url)
                image_filename = image_url.split("/")[-1]
                target_file = f"/tmp/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {image_url} to {target_file} ")
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")

get_pictures = PythonOperator(
    task_id = "get_pictures",
    python_callable = _get_pictures,
    dag= dag,
)

notify = BashOperator(
    task_id = "notify",
    bash_command = 'echo "There are now $(ls /tmp/images/ | wc -1) images. " ',
    dag= dag, 
)

download_launches>>get_pictures>>notify
    
© www.soinside.com 2019 - 2024. All rights reserved.