如何使用Airflow DataprocOperator在Google DataProc群集上运行shell脚本

问题描述 投票:2回答:1

我正在尝试在设置群集后在Dataproc群集中运行shell脚本。我被困或不确定要传递给操作员的参数是什么,以便在群集启动并运行后触发.sh文件。

用于创建集群的示例Airflow代码:

create_cluster = DataprocClusterCreateOperator(
    task_id='create_dataproc_cluster',
    cluster_name=DAG_CONFIG['DATAPROC']['cluster_name'],
    project_id=DAG_CONFIG['PROJECT_ID'],
    num_workers=DAG_CONFIG['DATAPROC']['num_workers'],
    zone=DAG_CONFIG['DATAPROC']['zone'],
    subnetwork_uri=DAG_CONFIG['DATAPROC']['subnetwork_uri'],
    master_machine_type='n1-standard-1',
    master_disk_type='pd-standard',
    master_disk_size=50,
    worker_machine_type='n1-standard-1',
    worker_disk_type='pd-standard',
    worker_disk_size=50,
    auto_delete_ttl=DAG_CONFIG['DATAPROC']['auto_delete_ttl'],
    storage_bucket=DAG_CONFIG['GCS_STAGING']['bucket_name'],
    dag=DAG_ID)

这是我需要通过DataprocHadoopOperator或任何适合的运算符提交shell脚本的地方。

Shell_Task = DataProcHadoopOperator(
    task_id='shell_Submit',
    main_jar='???',
    project_id='xxx',
    arguments= [??],
    job_name='{{task.task_id}}_{{ds_nodash}}',
    cluster_name=DAG_CONFIG['DATAPROC']['cluster_name'],
    gcp_conn_id='google_cloud_default',
    region=DAG_CONFIG['DATAPROC']['zone'],
    dag=DAG_ID)

任何帮助,将不胜感激。

google-cloud-platform airflow google-cloud-dataproc
1个回答
1
投票

要在群集创建期间在每个Dataproc VM上运行shell脚本,您应该使用Dataproc Initialization actions

您可以通过DataprocClusterCreateOperator指定它们:

DataprocClusterCreateOperator(
    # ...
    init_actions_uris = ['gs://<BUCKET>/path/to/init/action.sh'],
    # ...
)
© www.soinside.com 2019 - 2024. All rights reserved.