如何为 AWS Glue 作业实施可选参数?
我创建了一个作业,当前有一个字符串参数(ISO 8601 日期字符串)作为 ETL 作业中使用的输入。我想将此参数设为可选,以便作业在未提供时使用默认值(例如,在我的情况下使用 datetime.now 和 datetime.isoformat)。我尝试过使用 getResolvedOptions:
import sys
from awsglue.utils import getResolvedOptions
args = getResolvedOptions(sys.argv, ['ISO_8601_STRING'])
但是,当我没有传递
--ISO_8601_STRING
作业参数时,我看到以下错误:
awsglue.utils.GlueArgumentError:参数 --ISO_8601_STRING 是必需的
我为 python 编写了一个包装函数,它更通用,可以处理不同的极端情况(强制字段和/或带有值的可选字段)。
import sys
from awsglue.utils import getResolvedOptions
def get_glue_args(mandatory_fields, default_optional_args):
"""
This is a wrapper of the glue function getResolvedOptions to take care of the following case :
* Handling optional arguments and/or mandatory arguments
* Optional arguments with default value
NOTE:
* DO NOT USE '-' while defining args as the getResolvedOptions with replace them with '_'
* All fields would be return as a string type with getResolvedOptions
Arguments:
mandatory_fields {list} -- list of mandatory fields for the job
default_optional_args {dict} -- dict for optional fields with their default value
Returns:
dict -- given args with default value of optional args not filled
"""
# The glue args are available in sys.argv with an extra '--'
given_optional_fields_key = list(set([i[2:] for i in sys.argv]).intersection([i for i in default_optional_args]))
args = getResolvedOptions(sys.argv,
mandatory_fields+given_optional_fields_key)
# Overwrite default value if optional args are provided
default_optional_args.update(args)
return default_optional_args
用途:
# Defining mandatory/optional args
mandatory_fields = ['my_mandatory_field_1','my_mandatory_field_2']
default_optional_args = {'optional_field_1':'myvalue1', 'optional_field_2':'myvalue2'}
# Retrieve args
args = get_glue_args(mandatory_fields, default_optional_args)
# Access element as dict with args[‘key’]
将 Yuriy 的答案移植到 Python 解决了我的问题:
if ('--{}'.format('ISO_8601_STRING') in sys.argv):
args = getResolvedOptions(sys.argv, ['ISO_8601_STRING'])
else:
args = {'ISO_8601_STRING': datetime.datetime.now().isoformat()}
有一个解决方法可以提供可选参数。这个想法是在解决参数之前检查参数(Scala):
val argName = 'ISO_8601_STRING'
var argValue = null
if (sysArgs.contains(s"--$argName"))
argValue = GlueArgParser.getResolvedOptions(sysArgs, Array(argName))(argName)
我没有看到一种具有可选参数的方法,但您可以在作业本身上指定默认参数,然后如果您在运行作业时不传递该参数,您的作业将收到默认值(请注意,默认值不能为空)。
将 matsev 的答案包装在函数中:
def get_glue_env_var(key, default="none"):
if f'--{key}' in sys.argv:
return getResolvedOptions(sys.argv, [key])[key]
else:
return default
可以创建一个 Step Function,以不同的参数启动相同的 Glue 作业。状态机从选择状态开始,并根据存在的输入使用不同数量的输入。
stepFunctions:
stateMachines:
taskMachine:
role:
Fn::GetAtt: [ TaskExecutor, Arn ]
name: ${self:service}-${opt:stage}
definition:
StartAt: DefaultOrNot
States:
DefaultOrNot:
Type: Choice
Choices:
- Variable: "$.optional_input"
IsPresent: false
Next: DefaultTask
- Variable: "$. optional_input"
IsPresent: true
Next: OptionalTask
OptionalTask:
Type: Task
Resource: "arn:aws:states:::glue:startJobRun.task0"
Parameters:
JobName: ${self:service}-${opt:stage}
Arguments:
'--log_group.$': "$.specs.log_group"
'--log_stream.$': "$.specs.log_stream"
'--optional_input.$': "$. optional_input"
Catch:
- ErrorEquals: [ 'States.TaskFailed' ]
ResultPath: "$.errorInfo"
Next: TaskFailed
Next: ExitExecution
DefaultTask:
Type: Task
Resource: "arn:aws:states:::glue:startJobRun.sync"
Parameters:
JobName: ${self:service}-${opt:stage}
Arguments:
'--log_group.$': "$.specs.log_group"
'--log_stream.$': "$.specs.log_stream"
Catch:
- ErrorEquals: [ 'States.TaskFailed' ]
ResultPath: "$.errorInfo"
Next: TaskFailed
Next: ExitExecution
TaskFailed:
Type: Fail
Error: "Failure"
ExitExecution:
Type: Pass
End: True
AWS Glue 作业需要所有参数,可选参数当前不是一个选项。
我尝试使用可选参数的解决方案,将参数作为 JSON 传递。
--glue_params = {"var1":"value1", "var2":"value2", "optional_var1":"value3", "optional_var2": null}
在您的代码中,相应地处理参数
import sys
from awsglue.utils import getResolvedOptions
args = getResolvedOptions(sys.argv,['JOB_NAME','glue_params'])
希望这有帮助。
如果您使用该接口,则必须提供以“--”开头的参数名称,例如“--TABLE_NAME”,而不是“TABLE_NAME”,然后您可以像以下(python)代码一样使用它们:
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TABLE_NAME'])
table_name = args['TABLE_NAME']