我有几个 DAG,每个 DAG 包含几个 EMR 步骤。我创建了以下在所有 DAG 中使用的函数。此函数的目的是确定哪个 EMR 集群处于 RUNNING 或 PENDING 状态的活动步骤数量最少。该集群的 ID 返回到运算符中 op_kwargs 中的变量。
下面的 Python 运算符有一些 Python 函数“emr_step_adder”的输入参数,其中包括由函数
get_cluster_id()
返回的集群 ID。每个操作员必须调用 get_cluster_id()
来决定当时最合适的集群。这是由 boto3 EMR 功能完成的 emr_client.list_steps()
。
这里的问题是我的 DAG 正在创建大量(大约每秒 5 次)API 调用(我可以在 cloudtrail 中看到这一点)。这是由
emr_client.list_steps()
函数引起的(我可以在 cloudtrail 中看到这一点)。看起来 get_cluster_id()
函数正在执行,而没有 DAG 处于运行状态。它还抛出一个 ThrottlingException
,我可以在气流 UI 和日志中看到它。
我知道这可能与 DAG 解析有关。我需要更改什么才能仅在运行 DAG 时调用运算符中的
get_cluster_id()
函数?
def get_cluster_id(**kwargs):
emr_client = boto3.client("emr", 'us-east-1')
cluster_details = emr_client.list_clusters(ClusterStates=['STARTING','BOOTSTRAPPING','RUNNING','WAITING'])['Clusters']
cluster_list = []
for i in range(len(cluster_details)):
if cluster_details[i]['Name'] in ['cluster1', 'cluster2']:
cluster_id = cluster_details[i]['Id']
nr_of_active_steps = len(emr_client.list_steps(ClusterId=cluster_id, StepStates=['PENDING','RUNNING'])['Steps'])
cluster_list.append({'cluster_id':cluster_id, 'nr_of_active_steps':nr_of_active_steps})
return min(cluster_list, key=lambda x:x['nr_of_active_steps'])['cluster_id']
# Example operator
task1 = PythonOperator(
task_id='task1',
provide_context=True,
python_callable=emr_step_adder,
retries = 1,
retry_delay=timedelta(minutes=2),
op_kwargs={'task_name': 'task1'
,'cluster_id': get_cluster_id()
,'arg_list':['spark-submit', 's3://test/test.py']
,'timeout_in_seconds': 1800
},
dag=dag
)
只需将
get_cluster_id
移动到 emr_step_adder
内部,而不是将其作为参数传递。
def emr_step_adder(task_name, arg_list, timeout_in_seconds, **context):
cluster_id = get_cluster_id()
[...]
task1 = PythonOperator(
task_id='task1',
op_kwargs={'task_name': 'task1',
'arg_list': ['spark-submit', 's3://test/test.py'],
'timeout_in_seconds': 1800},
[...]
)
这样它只会在触发 DAG 时执行,而不是在每次解析 DAG 文件时执行。