我有一个关于如何最好地管理 AWSglue 中的并发作业实例的问题。
我的工作是这样定义的:
job = client.create_job(
Name='JOB_NAME',
Role='the-role-name',
ExecutionProperty={
'MaxConcurrentRuns': 25
},
Command={'Name': 'glueetl',
'ScriptLocation': script_location,
'PythonVersion': '3'},
Tags={'Application': 'app',
'Project': 'proj'},
GlueVersion='2.0',
WorkerType='G.2X',
NumberOfWorkers=50
)
我想像这样调用该作业的大约 1000 个实例:
def run_job(f):
response = client.start_job_run(
JobName = JOB_NAME,
Arguments = {
'--start_date': start_date,
'--end_date': end_date,
'--factor': f} )
return response
for f in factors:
response = run_job(f)
print(f"response: {response}")
这种方法的问题是 #1 立即触发所有这些请求会引发限制错误,#2 如果我在作业启动之间休眠,我仍然会遇到并发限制,即 50。
有人知道解决这些问题的简单方法吗?
“每个账户最大并发作业运行数”限制是软限制 (https://docs.aws.amazon.com/general/latest/gr/glue.html)。也许会向 AWS 记录服务请求并要求增加限制。第二件事是我不确定你是如何在代码中实现睡眠操作的,也许不是每次调用时只执行睡眠捕获异常,如果有异常,则在几秒钟内以指数退避进行睡眠,当睡眠时间结束时重试并重复,直到您得到积极的回应或当您达到自己设定的停止限制时。这样,您的处理将不会停止,直到您放弃,而是在节流开始时减慢速度。
您可以使用 Airflow 中的 on_failure 运算符来捕获您正在查找的特定错误。 on_failure 运算符允许您指定任务失败时将调用的函数。在您的情况下,您可以使用 on_failure 运算符调用一个函数,该函数将在延迟后重试该任务,或者监视可用于 Glue 作业的资源,并在 25 个正在运行的作业之一完成时启动它。
以下是如何使用 on_failure 运算符在延迟后重试任务的示例:
Python
'''
def retry_task(context):
delay = 60 * 5 # 5 minutes
print("Task failed, retrying in {} seconds".format(delay))
time.sleep(delay)
task_id = context["task_id"]
dag = context["dag"]
dag.retry(task_id)
dag = DAG("my_dag", start_date=datetime.today())
task = PythonOperator(
task_id="my_task",
python_callable=retry_task,
on_failure=retry_task,
)
dag.add_task(task)
'''
如果失败,此代码将重试任务 my_task。该任务将在延迟 5 分钟后重试。
您还可以使用 on_failure 运算符来监视 Glue 作业的可用资源,并在 25 个正在运行的作业之一完成时启动它。这是一个例子:
Python
'''
def monitor_resources(context):
job_name = context["task_id"].split(".")[0]
glue_job = GlueJob(job_name=job_name)
while True:
if glue_job.get_running_count() < 25:
print("There are less than 25 running jobs, kicking off the task")
break
time.sleep(10)
dag = DAG("my_dag", start_date=datetime.today())
task = PythonOperator(
task_id="my_task",
python_callable=monitor_resources,
on_failure=monitor_resources,
)
dag.add_task(task)
'''
此代码将监视 Glue 作业 my_task 的可用资源。如果正在运行的作业少于 25 个,则任务将被启动。
实际上这可以通过amap任务的step函数来处理,注意map tash可以定义并发级别,或者通过配置参数在胶水作业中设置