Airflow:在使用 EmailOperator 发送电子邮件之前下载文件

问题描述 投票:0回答:1

我有 Airflow DAG,其中函数 _query 在 Hive 中执行查询并使用 SambaHook 将结果保存在网络文件夹中。

#...
def _query(query_params_callable: Callable, params: Mapping[str, Any], sql, file, **context):
    query_params = query_params_callable(**params)

    smb_hook = SambaHook(samba_conn_id=conn_id_samba)

    with smb_hook.open_file(path=sql, mode='r', share_access='r') as sql_file:
        sql_query = sql_file.read().replace('$', '').format(**query_params)

    temp_file = tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False).name
    hook = HiveServer2Hook(hiveserver2_conn_id=conn_id_hive)
    df = hook.get_pandas_df(f"""{sql_query}""")
    df.to_excel(temp_file, index=False, header=[col.split(".")[1] for col in df.columns])

    file_path = file.format(**query_params)
    smb_hook.push_from_local(file_path, temp_file)

    task_instance = context['task_instance']
    task_instance.xcom_push(key='rows_count', value=len(df))
    task_instance.xcom_push(key='file_path', value=file_path)

#...
def generate_dag(dag_id, dag_settings, run_type):
    with DAG(
            dag_id=f'fr_alm_{dag_id}_{run_type}',
            tags=['fusion', 'hive', 'excel', 'fr_alm_hive_to_excel', 'reports', 'generate'],
            start_date=airflow.utils.dates.days_ago(10),
            schedule_interval=dag_settings['schedule'],
            render_template_as_native_obj=True,
            params={
                'report_date': None
            }
    ) as dag:
        start = EmptyOperator(task_id='start')

        task_query = PythonOperator(
            task_id='query',
            python_callable=_query,
            queue=airflow_queue_hadoop,
            op_kwargs={
                'query_params_callable': types[run_type]['callable'],
                'params': {
                    'report_date': '{{ params.report_date }}',
                    'ds': '{{ ds }}',
                    'delay': dag_settings['delay'],
                },
                'sql': dag_settings['sql'],
                'file': dag_settings['file'],
            },
            provide_context=True
        )

        task_email = EmailOperator(
            task_id='email',
            on_execute_callback=email_on_execute_callback
        )

        end = EmptyOperator(task_id='end')

        start >> task_query >> end
#...

我的目标是在task_query之后执行EmailOperator,但在此之前我需要将具有Samba授权的文件下载到执行EmailOperator的节点的本地文件系统。

我该怎么做?

我想了几个办法:

  1. 要使用 pre_execute - 我不确定它是否在执行 EmailOperator 的同一节点上工作。
  2. 使用 on_execute_callback - 与第一个相同
  3. 编写我自己的 SambaToEmailOperator。

您对可能的解决方案有何看法?

email airflow operators samba
1个回答
0
投票

pre_execute 方法是在执行任务之前运行的回调,而实际任务在执行方法期间运行。

因此,您可以编写 pre_execute 代码,将文件复制到节点中的临时目录,然后通过电子邮件发送。

另一个选项(如果可能)是将文件上传到 S3(或类似的),然后下载它,甚至预先签名(url)而不是文件。

© www.soinside.com 2019 - 2024. All rights reserved.