Airflow - 使用 python 函数的返回值设置 python 运算符 op_kwargs 变量

问题描述 投票:0回答:1

我有几个 DAG,每个 DAG 包含几个 EMR 步骤。我创建了以下在所有 DAG 中使用的函数。此函数的目的是确定哪个 EMR 集群处于 RUNNING 或 PENDING 状态的活动步骤数量最少。该集群的 ID 返回到运算符中 op_kwargs 中的变量。

下面的 Python 运算符有一些 Python 函数“emr_step_adder”的输入参数,其中包括由函数

get_cluster_id()
返回的集群 ID。每个操作员必须调用
get_cluster_id()
来决定当时最合适的集群。这是由 boto3 EMR 功能完成的
emr_client.list_steps()

这里的问题是我的 DAG 正在创建大量(大约每秒 5 次)API 调用(我可以在 cloudtrail 中看到这一点)。这是由

emr_client.list_steps()
函数引起的(我可以在 cloudtrail 中看到这一点)。看起来
get_cluster_id()
函数正在执行,而没有 DAG 处于运行状态。它还抛出一个
ThrottlingException
,我可以在气流 UI 和日志中看到它。

我知道这可能与 DAG 解析有关。我需要更改什么才能仅在运行 DAG 时调用运算符中的

get_cluster_id()
函数?

def get_cluster_id(**kwargs):
        emr_client = boto3.client("emr", 'us-east-1')
        cluster_details = emr_client.list_clusters(ClusterStates=['STARTING','BOOTSTRAPPING','RUNNING','WAITING'])['Clusters']
        cluster_list = []
        for i in range(len(cluster_details)):
            if cluster_details[i]['Name'] in ['cluster1', 'cluster2']: 
                cluster_id = cluster_details[i]['Id']
                nr_of_active_steps = len(emr_client.list_steps(ClusterId=cluster_id, StepStates=['PENDING','RUNNING'])['Steps'])
                cluster_list.append({'cluster_id':cluster_id, 'nr_of_active_steps':nr_of_active_steps})
        return min(cluster_list, key=lambda x:x['nr_of_active_steps'])['cluster_id']   

# Example operator
task1 = PythonOperator(
    task_id='task1',
    provide_context=True,
    python_callable=emr_step_adder,
    retries = 1,
    retry_delay=timedelta(minutes=2),
    op_kwargs={'task_name': 'task1'
        ,'cluster_id': get_cluster_id()
        ,'arg_list':['spark-submit', 's3://test/test.py']
        ,'timeout_in_seconds': 1800
    },
    dag=dag
)
python amazon-web-services airflow boto3 throttling
1个回答
0
投票

只需将

get_cluster_id
移动到
emr_step_adder
内部,而不是将其作为参数传递。

def emr_step_adder(task_name, arg_list, timeout_in_seconds, **context):
    cluster_id = get_cluster_id()
    [...]

task1 = PythonOperator(
    task_id='task1',
    op_kwargs={'task_name': 'task1',
               'arg_list': ['spark-submit', 's3://test/test.py'],
               'timeout_in_seconds': 1800},
    [...]
)

这样它只会在触发 DAG 时执行,而不是在每次解析 DAG 文件时执行。

© www.soinside.com 2019 - 2024. All rights reserved.