有条件地一个接一个地执行多个分支

问题描述 投票:0回答:1

  • 请仔细阅读并理解问题
  • 无法通过简单的BranchPythonOperator / BranchPythonOperator来解决

我们在工作流程中有一个不寻常的ShortCircuitOperator类用例

ShortCircuitOperator

该流程应按如下方式工作

  • multiplexer监听外部队列中的事件(单个队列)
  • 队列中的每个事件都会触发分支之一的执行(分支n。开始任务)
  • 一对一,事件到达时,MUX任务必须触发各个分支的执行
  • 一旦所有分支都被触发,MUX任务完成

假设

  • 正好 +-----------------------+ | | +------------>+ branch-1.begin-task | | | | | +-----------------------+ | | | +-----------------------+ | | | +------------>+ branch-2.begin-task | | | | +------------+ | +-----------------------+ | | | | MUX-task +----+ + | | | | +------------+ | | | +- -- -- -- -> | | | | | | + | | +-----------------------+ | | | +------------>+ branch-n.begin-task | | | +-----------------------+ 个事件到达队列,一个事件触发每个分支
  • MUX-task动态已知:其值在n]中定义>
  • 限制

  • 事件到达的外部队列是仅一个
  • 我们没有n队列(每个分支一个),因为分支随着时间增长(动态定义了n)

  • 我们无法在Airflow的Variable集合中(或在Variable内进行的任何此类事情提供解决方案,以构建此解决方案]

  1. n s可用于侦听外部队列上的事件;但是我们必须听多个事件,而不是一个
  2. operators and sensors可用于触发执行多个分支中的单个分支,但立即执行Airflow
  3. 主要瓶颈

由于上述第二个限制,即使是将SensorBranchPythonOperator的功能组合在一起的自定义运算符也无法使用。

[我们试图围绕BranchPythonOperatormarks remaining branches as skippedSensor的奇特组合进行集思广益,但到目前为止还没有成功。

这在气流中可行吗?


UPDATE-1

这里有一些背景信息来了解工作流的上下文

  • 我们有一个ETL管道将BranchPythonOperator表(跨多个Sensors数据库)同步到我们的数据仓库
  • 为了克服同步管道对生产数据库的影响,我们决定这样做
    • 对于每个数据库,创建一个快照
    • (上次备份的DummyOperator
    • 使用该[[快照
    • 运行trigger_rules同步管道然后在同步结束时,终止
    • 快照
    • trigger_rules群集]
MySQL快照的[Aurora]
  • 还原过程
  • 已发布到restore AuroraDB cluster队列
      所有数据库的单个队列
  • 此设置是由我们的DevOps团队完成的(不同的AWS帐户,我们无权访问基础AuroraDB s /MySQL/下文)
  • 注意,请仔细阅读并理解问题,不能通过简单的BranchPythonOperator / ShortCircuitOperator来解决。我们的工作流程中有一个与多路复用器类似的用例...
  • airflow
    1个回答
    0
    投票
    AuroraDB来抢救!
    © www.soinside.com 2019 - 2024. All rights reserved.