我有 Airflow DAG,其中函数 _query 在 Hive 中执行查询并使用 SambaHook 将结果保存在网络文件夹中。
#...
def _query(query_params_callable: Callable, params: Mapping[str, Any], sql, file, **context):
query_params = query_params_callable(**params)
smb_hook = SambaHook(samba_conn_id=conn_id_samba)
with smb_hook.open_file(path=sql, mode='r', share_access='r') as sql_file:
sql_query = sql_file.read().replace('$', '').format(**query_params)
temp_file = tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False).name
hook = HiveServer2Hook(hiveserver2_conn_id=conn_id_hive)
df = hook.get_pandas_df(f"""{sql_query}""")
df.to_excel(temp_file, index=False, header=[col.split(".")[1] for col in df.columns])
file_path = file.format(**query_params)
smb_hook.push_from_local(file_path, temp_file)
task_instance = context['task_instance']
task_instance.xcom_push(key='rows_count', value=len(df))
task_instance.xcom_push(key='file_path', value=file_path)
#...
def generate_dag(dag_id, dag_settings, run_type):
with DAG(
dag_id=f'fr_alm_{dag_id}_{run_type}',
tags=['fusion', 'hive', 'excel', 'fr_alm_hive_to_excel', 'reports', 'generate'],
start_date=airflow.utils.dates.days_ago(10),
schedule_interval=dag_settings['schedule'],
render_template_as_native_obj=True,
params={
'report_date': None
}
) as dag:
start = EmptyOperator(task_id='start')
task_query = PythonOperator(
task_id='query',
python_callable=_query,
queue=airflow_queue_hadoop,
op_kwargs={
'query_params_callable': types[run_type]['callable'],
'params': {
'report_date': '{{ params.report_date }}',
'ds': '{{ ds }}',
'delay': dag_settings['delay'],
},
'sql': dag_settings['sql'],
'file': dag_settings['file'],
},
provide_context=True
)
task_email = EmailOperator(
task_id='email',
on_execute_callback=email_on_execute_callback
)
end = EmptyOperator(task_id='end')
start >> task_query >> end
#...
我的目标是在task_query之后执行EmailOperator,但在此之前我需要将具有Samba授权的文件下载到执行EmailOperator的节点的本地文件系统。
我该怎么做?
我想了几个办法:
您对可能的解决方案有何看法?
pre_execute 方法是在执行任务之前运行的回调,而实际任务在执行方法期间运行。
因此,您可以编写 pre_execute 代码,将文件复制到节点中的临时目录,然后通过电子邮件发送。
另一个选项(如果可能)是将文件上传到 S3(或类似的),然后下载它,甚至预先签名(url)而不是文件。