AWS Glue 中的可选作业参数?

问题描述 投票:0回答:8

如何为 AWS Glue 作业实施可选参数?

我创建了一个作业,当前有一个字符串参数(ISO 8601 日期字符串)作为 ETL 作业中使用的输入。我想将此参数设为可选,以便作业在未提供时使用默认值(例如,在我的情况下使用 datetime.nowdatetime.isoformat)。我尝试过使用 getResolvedOptions:

import sys
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv, ['ISO_8601_STRING'])

但是,当我没有传递

--ISO_8601_STRING
作业参数时,我看到以下错误:

awsglue.utils.GlueArgumentError:参数 --ISO_8601_STRING 是必需的

python amazon-web-services aws-glue
8个回答
20
投票
如果您只有一个可选字段,

matsevYuriy 解决方案就很好。

我为 python 编写了一个包装函数,它更通用,可以处理不同的极端情况(强制字段和/或带有值的可选字段)。

import sys    
from awsglue.utils import getResolvedOptions

def get_glue_args(mandatory_fields, default_optional_args):
    """
    This is a wrapper of the glue function getResolvedOptions to take care of the following case :
    * Handling optional arguments and/or mandatory arguments
    * Optional arguments with default value
    NOTE: 
        * DO NOT USE '-' while defining args as the getResolvedOptions with replace them with '_'
        * All fields would be return as a string type with getResolvedOptions

    Arguments:
        mandatory_fields {list} -- list of mandatory fields for the job
        default_optional_args {dict} -- dict for optional fields with their default value

    Returns:
        dict -- given args with default value of optional args not filled
    """
    # The glue args are available in sys.argv with an extra '--'
    given_optional_fields_key = list(set([i[2:] for i in sys.argv]).intersection([i for i in default_optional_args]))

    args = getResolvedOptions(sys.argv,
                            mandatory_fields+given_optional_fields_key)

    # Overwrite default value if optional args are provided
    default_optional_args.update(args)

    return default_optional_args

用途:

# Defining mandatory/optional args
mandatory_fields = ['my_mandatory_field_1','my_mandatory_field_2']
default_optional_args = {'optional_field_1':'myvalue1', 'optional_field_2':'myvalue2'}
# Retrieve args
args = get_glue_args(mandatory_fields, default_optional_args)
# Access element as dict with args[‘key’]

19
投票

Yuriy 的答案移植到 Python 解决了我的问题:

if ('--{}'.format('ISO_8601_STRING') in sys.argv):
    args = getResolvedOptions(sys.argv, ['ISO_8601_STRING'])
else:
    args = {'ISO_8601_STRING': datetime.datetime.now().isoformat()}

8
投票

有一个解决方法可以提供可选参数。这个想法是在解决参数之前检查参数(Scala):

val argName = 'ISO_8601_STRING'
var argValue = null
if (sysArgs.contains(s"--$argName"))
   argValue = GlueArgParser.getResolvedOptions(sysArgs, Array(argName))(argName)

4
投票

我没有看到一种具有可选参数的方法,但您可以在作业本身上指定默认参数,然后如果您在运行作业时不传递该参数,您的作业将收到默认值(请注意,默认值不能为空)。


3
投票

matsev 的答案包装在函数中:

def get_glue_env_var(key, default="none"):
    if f'--{key}' in sys.argv:
        return getResolvedOptions(sys.argv, [key])[key]
    else:
        return default

1
投票

可以创建一个 Step Function,以不同的参数启动相同的 Glue 作业。状态机从选择状态开始,并根据存在的输入使用不同数量的输入。

stepFunctions:
  stateMachines:
    taskMachine:
      role:
        Fn::GetAtt: [ TaskExecutor, Arn ]
      name: ${self:service}-${opt:stage}
      definition:
        StartAt: DefaultOrNot
        States:

          DefaultOrNot:
            Type: Choice
            Choices:
              - Variable: "$.optional_input"
                IsPresent: false
                Next: DefaultTask
              - Variable: "$. optional_input"
                IsPresent: true
                Next: OptionalTask

          OptionalTask:
            Type: Task
            Resource:  "arn:aws:states:::glue:startJobRun.task0"
            Parameters:
              JobName: ${self:service}-${opt:stage}
              Arguments:
                '--log_group.$': "$.specs.log_group"
                '--log_stream.$': "$.specs.log_stream"
                '--optional_input.$': "$. optional_input"

            Catch:
              - ErrorEquals: [ 'States.TaskFailed' ]
                ResultPath: "$.errorInfo"
                Next: TaskFailed
            Next: ExitExecution


          DefaultTask:
            Type: Task
            Resource:  "arn:aws:states:::glue:startJobRun.sync"
            Parameters:
              JobName: ${self:service}-${opt:stage}
              Arguments:
                '--log_group.$': "$.specs.log_group"
                '--log_stream.$': "$.specs.log_stream"


            Catch:
              - ErrorEquals: [ 'States.TaskFailed' ]
                ResultPath: "$.errorInfo"
                Next: TaskFailed
            Next: ExitExecution

          TaskFailed:
            Type: Fail
            Error: "Failure"

          ExitExecution:
            Type: Pass
            End: True

0
投票

AWS Glue 作业需要所有参数,可选参数当前不是一个选项。

我尝试使用可选参数的解决方案,将参数作为 JSON 传递。

 --glue_params = {"var1":"value1", "var2":"value2", "optional_var1":"value3", "optional_var2": null}

在您的代码中,相应地处理参数

import sys
from awsglue.utils import getResolvedOptions
args = getResolvedOptions(sys.argv,['JOB_NAME','glue_params'])

希望这有帮助。


-2
投票

如果您使用该接口,则必须提供以“--”开头的参数名称,例如“--TABLE_NAME”,而不是“TABLE_NAME”,然后您可以像以下(python)代码一样使用它们:

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TABLE_NAME'])
table_name = args['TABLE_NAME']
© www.soinside.com 2019 - 2024. All rights reserved.