Airflow HiveOperator 结果集

问题描述 投票:0回答:1

我是 Airflow 和 Python 的新手,我正在尝试配置计划报告。该报告需要从 Hive 中提取数据并通过电子邮件发送结果。

到目前为止我的代码:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.hive_operator import HiveOperator

default_args = {
    'owner': 'me',
    'depends_on_past': False,
    'start_date': datetime(2015, 1, 1),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 3,
    'retry_delay': timedelta(hours=2)
}

dag = DAG(
    dag_id='hive_report',
    max_active_runs=1,
    default_args=default_args,
    schedule_interval='@once')

query = """
    #query goes here
"""

run_hive_query = HiveOperator(
    task_id="fetch_data",
    hql=query,
    dag=dag
)

我很确定我需要添加一个 EmailOperator 任务来发送结果,因为这似乎只配置为在失败或重试时发送电子邮件。

我的问题是:Hive 运算符对结果集做什么?将结果集从一项任务传递到另一项任务的最佳方式是什么?

python hadoop hive airflow
1个回答
0
投票

您还需要添加 Hive 连接 ID(用于连接到 Hive 数据库的 Airflow 连接的 ID)作为 HiveOperator 中的参数,以便 Airflow 可以连接到您的数据库:

run_hive_query = HiveOperator(
task_id="fetch_data",
hql=query,
hive_cli_conn_id='my_conn_id'
dag=dag

HiveOperator 参数

© www.soinside.com 2019 - 2024. All rights reserved.