解决 AWS Glue 中的并发限制

问题描述 投票:0回答:3

我有一个关于如何最好地管理 AWSglue 中的并发作业实例的问题。

我的工作是这样定义的:

job = client.create_job(
        Name='JOB_NAME', 
        Role='the-role-name',
        ExecutionProperty={
            'MaxConcurrentRuns': 25
        },
        Command={'Name': 'glueetl',
                 'ScriptLocation': script_location,
                 'PythonVersion': '3'},
        Tags={'Application': 'app',
              'Project': 'proj'},
        GlueVersion='2.0',
        WorkerType='G.2X',
        NumberOfWorkers=50
    )

我想像这样调用该作业的大约 1000 个实例:

def run_job(f):
    response = client.start_job_run(
                JobName = JOB_NAME,
                Arguments = {
                    '--start_date':  start_date,
                    '--end_date':  end_date,
                    '--factor':  f} )
    return response


for f in factors:
        response = run_job(f)
        print(f"response: {response}")

这种方法的问题是 #1 立即触发所有这些请求会引发限制错误,#2 如果我在作业启动之间休眠,我仍然会遇到并发限制,即 50。

有人知道解决这些问题的简单方法吗?

python boto3 aws-glue
3个回答
2
投票

“每个账户最大并发作业运行数”限制是软限制 (https://docs.aws.amazon.com/general/latest/gr/glue.html)。也许会向 AWS 记录服务请求并要求增加限制。第二件事是我不确定你是如何在代码中实现睡眠操作的,也许不是每次调用时只执行睡眠捕获异常,如果有异常,则在几秒钟内以指数退避进行睡眠,当睡眠时间结束时重试并重复,直到您得到积极的回应或当您达到自己设定的停止限制时。这样,您的处理将不会停止,直到您放弃,而是在节流开始时减慢速度。


0
投票

您可以使用 Airflow 中的 on_failure 运算符来捕获您正在查找的特定错误。 on_failure 运算符允许您指定任务失败时将调用的函数。在您的情况下,您可以使用 on_failure 运算符调用一个函数,该函数将在延迟后重试该任务,或者监视可用于 Glue 作业的资源,并在 25 个正在运行的作业之一完成时启动它。

以下是如何使用 on_failure 运算符在延迟后重试任务的示例:

Python

'''

def retry_task(context):
    delay = 60 * 5  # 5 minutes

    print("Task failed, retrying in {} seconds".format(delay))
    time.sleep(delay)

    task_id = context["task_id"]
    dag = context["dag"]

    dag.retry(task_id)


dag = DAG("my_dag", start_date=datetime.today())

task = PythonOperator(
    task_id="my_task",
    python_callable=retry_task,
    on_failure=retry_task,
)

dag.add_task(task)

'''

如果失败,此代码将重试任务 my_task。该任务将在延迟 5 分钟后重试。

您还可以使用 on_failure 运算符来监视 Glue 作业的可用资源,并在 25 个正在运行的作业之一完成时启动它。这是一个例子:

Python

'''

def monitor_resources(context):

    job_name = context["task_id"].split(".")[0]
    glue_job = GlueJob(job_name=job_name)

    while True:
        if glue_job.get_running_count() < 25:
        print("There are less than 25 running jobs, kicking off the task")
        break

    time.sleep(10)


dag = DAG("my_dag", start_date=datetime.today())

task = PythonOperator(
    task_id="my_task",
    python_callable=monitor_resources,
    on_failure=monitor_resources,
)

dag.add_task(task)

'''

此代码将监视 Glue 作业 my_task 的可用资源。如果正在运行的作业少于 25 个,则任务将被启动。


0
投票

实际上这可以通过amap任务的step函数来处理,注意map tash可以定义并发级别,或者通过配置参数在胶水作业中设置

© www.soinside.com 2019 - 2024. All rights reserved.