我正在尝试提交 EMR 作业。 EC2 上的电子病历。我正在起诉 Airflow 提供的代码。按照 Apache Airflow 的建议,使用 Docker 安装 Airflow。
从日期时间导入时间增量 从气流导入 DAG 从 airflow.providers.amazon.aws.operators.emr_create_job_flow 导入 EmrCreateJobFlowOperator 从 airflow.providers.amazon.aws.sensors.emr_job_flow 导入 EmrJobFlowSensor 从 airflow.utils.dates 导入 days_ago
SPARK_STEPS = [ { '名称':'calculate_pi', 'ActionOnFailure':'继续', 'HadoopJarStep':{ '罐子':'command-runner.jar', 'Args': ['/usr/lib/spark/bin/run-example', 'SparkPi', '10'], }, } ]
JOB_FLOW_OVERRIDES = { '名称':'PiCalc', 'ReleaseLabel': 'emr-5.29.0', '实例':{ '实例组':[ { '名称': '主节点', '市场':'现货', '实例角色':'主人', 'InstanceType': 'm1.medium', '实例计数':1, } ], 'KeepJobFlowAliveWhenNoSteps':假, '终止保护':假, }, '步骤':SPARK_STEPS, 'JobFlowRole': 'EMR_EC2_DefaultRole', 'ServiceRole': 'EMR_DefaultRole', }
有向无环图( dag_id='emr_job_flow_automatic_steps_dag', 默认参数={ '所有者':'气流', 'depends_on_past':假, '电子邮件':['[email protected]'], 'email_on_failure':错误, 'email_on_retry':假, }, dagrun_timeout=timedelta(小时=2), 开始日期=天前(2), schedule_interval='0 3 * * *', 标签=['示例'], ) 作为 dag:
# [START howto_operator_emr_automatic_steps_tasks]
job_flow_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
)
job_sensor = EmrJobFlowSensor(
task_id='check_job_flow',
job_flow_id=job_flow_creator.output,
aws_conn_id='aws_default',
)
# [END howto_operator_emr_automatic_steps_tasks]
# Task dependency created via `XComArgs`:
# job_flow_creator >> job_sensor
######################################### 问题是:#
尽管在我的调度程序容器中安装了 amazon 提供程序,但给出错误提示无法导入模块。它们可以使用(在手动步骤中给出)
导入从 airflow.providers.amazon.aws.operators.emr 导入(
EmrAddStepsOperator,
EmrCreateJobFlowOperator,
EmrModifyClusterOperator,
EmrTerminateJobFlowOperator,
)
从 airflow.providers.amazon.aws.sensors.emr 导入 EmrJobFlowSensor
“线程“main”中的异常 java.lang.RuntimeException:java.io.IOException:无法运行程序“/usr/lib/spark/bin/run-example”(在目录“.”中):error=2,否这样的文件或目录“
我的问题是什么?期待一些帮助。谢谢