我正在使用Luigi启动AWS Batch作业。我想创建luigi.contrib.batch.BatchTask
的子类(可在此处找到有关Luigi for AWS Batch的文档(依赖于boto3):https://luigi.readthedocs.io/en/stable/_modules/luigi/contrib/batch.html#BatchTask)
Class BatchTask类:
class BatchTask(luigi.Task):
"""
Base class for an Amazon Batch job
Amazon Batch requires you to register "job definitions", which are JSON
descriptions for how to issue the ``docker run`` command. This Luigi Task
requires a pre-registered Batch jobDefinition name passed as a Parameter
:param job_definition (str): name of pre-registered jobDefinition
:param job_name: name of specific job, for tracking in the queue and logs.
:param job_queue: name of job queue where job is going to be submitted.
"""
job_definition = luigi.Parameter()
job_name = luigi.OptionalParameter(default=None)
job_queue = luigi.OptionalParameter(default=None)
poll_time = luigi.IntParameter(default=POLL_TIME)
def run(self):
bc = BatchClient(self.poll_time)
job_id = bc.submit_job(
self.job_definition,
self.parameters,
job_name=self.job_name,
queue=self.job_queue)
bc.wait_on_job(job_id)
@property
def parameters(self):
"""Override to return a dict of parameters for the Batch Task"""
return {}
对于我的子类,我想覆盖参数功能,以便可以使用代码传递自己的参数,而不是依靠AWS Web界面手动填充参数。此参数函数在函数上方具有@property
标记,它向我显示它是一个装饰器-我不太熟悉。
我认为我的问题与上述情况无关,而是:
如何覆盖参数函数,以便它不返回空字典,而是返回我定义的字典?
书面子类:
import luigi.contrib.batch as batch
batch_job_revision_number=1
class SubclassLuigiBatchTask(batch.BatchTask):
job_definition='arn:aws:batch:{0}:job-definition/{1}:{2}'.format(
'aws-credentials-that-i-cannot-share',
'aws_pre-registered-job-description-name',
batch_job_revision_number)
job_name='my_example_job'
job_queue='my_example_queue'
poll_time = 10
task = batch.BatchTask(
job_definition='arn:aws:batch:{0}:job-definition/{1}:{2}'.format(
'aws-credentials-that-i-cannot-share',
'aws_pre-registered-job_description',
batch_job_revision_number),
job_name='my_example_job',
job_queue='my_example_queue',
poll_time=10
)
@task.parameters
def parameters(self):
return {
"job_definition": "df -h"
}
# This run function runs the predefined job (job definition)
def run(self):
self.task.run()
# function only runs if output doesn't exist yet
def output(self):
return LocalTarget('/home/some_user/task_completed.txt')
上面的示例返回错误:
TypeError: 'dict' object is not callable
如何访问setter函数以覆盖此默认值return {}
?我尝试在命令行上查看task.parameters
可用的功能,以查看可用的方法:
>>> dir(task.parameters)
['__class__', '__contains__', '__delattr__', '__delitem__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setitem__', '__sizeof__', '__str__', '__subclasshook__', 'clear', 'copy', 'fromkeys', 'get', 'items', 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values']
代替
@task.parameters
def parameters(self):
...
您需要做:
@property
def parameters(self):
...
用装饰器替代属性还不错。只需看下面的例子:
class A:
def __init__(self):
self._a = "3"
@property
def a(self):
return self._a
class B(A):
def __init__(self):
super(B, self).__init__()
self._b = "4"
@property
def a(self):
return self._b
a = A()
b = B()
print('a', a.a)
print('b', b.a)
这将打印出:
a 3
b 4
[我认为添加怪异的装饰器@task.parameters
可能会弄乱它。可能实际上是想对自己进行递归?非常奇怪的东西。