注
BranchPythonOperator
/ BranchPythonOperator
来解决我们在工作流程中有一个不寻常的ShortCircuitOperator
类用例
ShortCircuitOperator
该流程应按如下方式工作
假设
+-----------------------+
| |
+------------>+ branch-1.begin-task |
| | |
| +-----------------------+
|
|
| +-----------------------+
| | |
+------------>+ branch-2.begin-task |
| | |
+------------+ | +-----------------------+
| | |
| MUX-task +----+ +
| | | |
+------------+ |
| |
+- -- -- -- ->
| |
|
| |
| +
|
| +-----------------------+
| | |
+------------>+ branch-n.begin-task |
| |
+-----------------------+
个事件到达队列,一个事件触发每个分支MUX-task
是动态已知:其值在n
]中定义>限制
n
队列(每个分支一个),因为分支随着时间增长(动态定义了n)我们无法在Airflow的Variable
集合中(或在Variable
内进行的任何此类事情提供解决方案,以构建此解决方案]
n
s可用于侦听外部队列上的事件;但是我们必须听多个事件,而不是一个Airflow
主要瓶颈
由于上述第二个限制,即使是将Sensor
和BranchPythonOperator
的功能组合在一起的自定义运算符也无法使用。
[我们试图围绕BranchPythonOperator
,marks remaining branches as skipped和Sensor
的奇特组合进行集思广益,但到目前为止还没有成功。
这在气流中可行吗?
UPDATE-1
这里有一些背景信息来了解工作流的上下文
BranchPythonOperator
表(跨多个Sensors
数据库)同步到我们的数据仓库DummyOperator
)trigger_rules
同步管道然后在同步结束时,终止trigger_rules
群集]MySQL
快照的[Aurora
] AuroraDB
cluster队列AuroraDB
s /MySQL
/下文)AuroraDB
来抢救!