文件异步到达的相关数据处理管道

问题描述 投票:0回答:1

我有几个与数据相关的任务/管道,其中一些任务/管道取决于另一个任务的完成。更难的是数据可以异步到达,这意味着某些任务需要wait,直到上一步中的所有文件或任务都已处理。

这里是一个例子:

假设我们有一个带有索引的原始文件x[i,j],其中i代表主要类别j中的一个特定子类别。

我需要运行以下管道:

  1. 管道1:清除原始文件x[i,j]并将其存储为x_clean[i,j]
  2. 管道2:对i中的所有j完成管道1后,将x_clean[i,j]中的结果汇总并存储为y_clean[j]
  3. 管道3:清除原始文件z[j]并将其存储为z_clean[j]
  4. 管道4:完成管道2和管道3后,将z_clean[j]y_clean[j]合并并存储为w_clean[j]

我可以采用哪种模型来处理这种数据流方法?这种数据处理任务背后是否有任何方法论? GCP是否针对此类问题构建了某些东西?

architecture workflow pipeline dataflow data-processing
1个回答
0
投票

在生产过程中...

  • 步骤取决于其他步骤的完成。

  • 材料可以异步到达,这意味着随后的步骤等待产品到达工作状态。但是,请注意,这并不意味着无限的物料会失控,而仅是特定制造订单要消耗的物料。如果您的方案允许涌入无限制的数据流,则您必须对其进行预处理,以避免混合不同的产品组件。不要破坏过程的结构以尝试在某个缓冲区或其他任何缓冲区中处理异步到达的数据,因为制造数据产品涉及关系数据而不是原材料。

  • 子组件可以在分支中完成,这意味着组装步骤在组装开始之前等待相关组件的协调集合到达。

我是迄今为止唯一的协作(制造)体系结构POWER的创建者。关于这个主题,有很多东西要学习,但是您可以在线找到我的文章和代码:http://www.powersemantics.com/

这是您的制造模型中的工作流程:

    class MyProduct
    {
        public object[i,j] x_clean { get; set; }
        public object[j] y_clean { get; set; }
        public object[j] z_clean { get; set; }
        // final product
        public object[j] w_clean { get; set; }
    }
    class MyProcess : Producer<MyProduct>, IProcess, IMachine, IOrganize
    {
        // process inputs
        public object[i,j] x { get; set; }  // raw file
        public object[j] z { get; set; } // raw file

        // machines
        public CleanerA Cleaner1 { get; set; }
        public Aggregator Aggregator1 { get; set }
        public CleanerB Cleaner2 { get; set; }
        public Assembler Assembler1 { get; set; }

        public void D() { // instantiates properties and machines }
        public void O()
        {
            // bind machines to work on the same data points
            // allows maintenance to later remove cleaners if it becomes possible
            // for the process to receive data in the correct form
            Cleaner1.x = x;
            Cleaner1.Product.x_clean = Product.x_clean;

            Aggregator1.x_clean = Product.x_clean;
            Aggregator1.Product.y_clean = Product.y_clean;

            Cleaner2.z = z;
            Cleaner2.Product.z_clean = Product.z_clean;

            Assembler1.z_clean = Product.z_clean;
            Assembler1.y_clean = Product.y_clean;
            Assembler1.Product.w_clean = Product.w_clean;
        }

        // hardcoded synchronous controller
        public void M()
        {
            Cleaner1.M();
            Aggregator1.M();
            Cleaner2.M();
            Assembler1.M();
        }
    }

    // these class pairs are Custom Machines, very specific work organized
    // by user requirements rather than in terms of domain-specific operations
    class CleanerAProduct
    {
        public object[i,j] x_clean { get; set; }
    }
    class CleanerA: Producer<CleanerAProduct>, IMachine
    {
        public object[i,j] x { get; set; }  // raw file
        public void M()
        {
            // clean the raw file x[i,j] and store it as x_clean[i,j]
        }
    }


    class AggregatorProduct
    {
        public object[j] y_clean { get; set; }
    }
    class Aggregator: Producer<AggregatorProduct>, IMachine
    {
        public object[i,j] x_clean { get; set; }
        public void M()
        {
            // aggregate the results from x_clean[i,j] and store it as y_clean[j]
        }
    }


    class CleanerBProduct
    {
        public object[j] z_clean { get; set; }
    }
    class CleanerB : Producer<CleanerBProduct>, IMachine
    {
        public object[j] z { get; set; }
        public void M()
        {
            // clean a raw file z[j] and store it as z_clean[j]
        }
    }


    class AssemblerProduct
    {
        public object[j] w_clean { get; set; }
    }
    class Assembler : Producer<AssemblerProduct>, IMachine
    {
        public object[j] y_clean { get; set; }
        public object[j] z_clean { get; set; }
        public void M()
        {
            // combine z_clean[j] and y_clean[j] and store it as w_clean[j]
        }
    }

生产过程类的常规用法:

  1. 实例化。调用D()实例化机器和产品。
  2. 将所有输入分配给该过程。
  3. 调用O()以使流程将这些输入分配给机器,并绑定机器以在最终产品上运行。这是您在制作前覆盖这些分配的最后机会。
  4. 调用M()以执行该过程。

大多数源代码将生产者和消费者焊接在同一个功能体内,从而在以后进行维护时变得很痛苦,然后功能将数据通过电子邮件发送给彼此,就像无用的上班族一样,他们不保留电子邮件记录。当您以后要做出垂直集成决策(如更换机器或扩展流程)时,就会引起问题,而所有这些我都已在源文件中进行了记录。 POWER是唯一避免集中化等复杂性的体系结构。我在2月发布了它。

有ETL工具和其他解决方案,例如TPL Dataflow,但是生产过程不会为程序员自行组织或管理。所有程序员都需要学习POWER,以正确处理浪费,集成,控制和仪器的职责。当我们编写自动化代码,然后又不能停止实时执行时,用人单位对我们很滑稽,但是我们的教育只是准备好我们创建流程,而不是像制造方法那样构建流程。

© www.soinside.com 2019 - 2024. All rights reserved.