实施路易动态图形配置

问题描述 投票:10回答:1

我是新来LUIGI,碰到它,同时设计了我们的ML努力管道。虽然它并没有安装到我的特定使用情况下,它有这么多额外的功能,我决定把它适合。

基本上我一直在寻找一个方法,能够坚持一个定制的管道,因此有其结果可重复,更容易部署,读书最多的在线教程后,我尝试使用现有luigi.cfg配置和命令行来实现我的系列化机制,它可能足以为任务的参数,但它没有提供序列化我管线的DAG连接的方式,所以我决定其收到json config file一个WrapperTask那么这将创建所有任务实例和连接所有输入输出的路易吉任务通道(做所有的管道)。

我在此附上一个小的测试程序的审核:

import random
import luigi
import time
import os


class TaskNode(luigi.Task):
    i = luigi.IntParameter()  # node ID

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.required = []

    def set_required(self, required=None):
        self.required = required  # set the dependencies
        return self

    def requires(self):
        return self.required

    def output(self):
        return luigi.LocalTarget('{0}{1}.txt'.format(self.__class__.__name__, self.i))

    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('inside {0}{1}\n'.format(self.__class__.__name__, self.i))
        self.process()

    def process(self):
        raise NotImplementedError(self.__class__.__name__ + " must implement this method")


class FastNode(TaskNode):

    def process(self):
        time.sleep(1)


class SlowNode(TaskNode):

    def process(self):
        time.sleep(2)


# This WrapperTask builds all the nodes 
class All(luigi.WrapperTask):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        num_nodes = 513

        classes = TaskNode.__subclasses__()
        self.nodes = []
        for i in reversed(range(num_nodes)):
            cls = random.choice(classes)

            dependencies = random.sample(self.nodes, (num_nodes - i) // 35)

            obj = cls(i=i)
            if dependencies:
                obj.set_required(required=dependencies)
            else:
                obj.set_required(required=None)

            # delete existing output causing a build all
            if obj.output().exists():
                obj.output().remove()  

            self.nodes.append(obj)

    def requires(self):
        return self.nodes


if __name__ == '__main__':
    luigi.run()

所以,基本上,在问题的标题说,这侧重于动态依赖,并与a 513 node dependency DAG产生p=1/35 connectivity probability,它还类定义了所有的(如在使所有),因为这需要所有节点要建它是一个WrapperTask考虑做(我有一个版本只把它连接到连接DAG组件的头,但我不想过度复杂)。

有没有实现这更标准(Luigic)的方式?特别要注意与TaskNode init和set_required方法不那么漂亮的并发症,我只是因为在init方法上有冲突的方式路易吉接收参数以某种方式登记的参数就是这么做的。我也尝试过其他几种方法,但是这基本上是最体面的一个(即工作)

如果没有一个标准的方式,我仍然希望听到你对我打算去我完成实施框架之前,过程中的任何见解。

python python-3.x luigi data-pipeline
1个回答
1
投票

我昨天answered a similar question与演示。我是基于几乎完全关闭example in the docs.的。在文档,通过yeilding任务分配动态的依赖关系好像他们喜欢的方式。

luigi.Config和动态依赖可能可以给你几乎无限的灵活性管道。他们还描述了虚拟Task调用多重依赖性链here,它可以给你更多的控制权。

© www.soinside.com 2019 - 2024. All rights reserved.