在Apache Beam中添加2个Dofn之间的依赖项

问题描述 投票:2回答:2

有什么方法可以在2个Dofn之间创建依赖关系,以便它将等待第一个Dofn方法完成,然后将运行第二个Dofn方法。只想知道我们如何实现这个用例。

google-cloud-dataflow apache-beam
2个回答
2
投票

也许有一种更清洁的方法,但是我注意到执行以下操作可以达到您想要的效果:

将第一个DoFn的输出也路由到计数器,然后将该计数器的输出作为旁侧输入传递到第二个DoFn的ParDo中

class DoFn2(apache_beam.DoFn):
    def process(self, element, count_do_fn_1_output, *args, **kwargs):
        # ...

do_fn_1_output = do_fn_1_input | 'do fn 1' >> apache_beam.ParDo(DoFn1())

count_do_fn_1_output = (
    do_fn_1_output 
    | 'count do_fn_1_output' >> apache_beam.combiners.Count.Globally())

do_fn_2_output = (
    do_fn_1_output 
    | 'do fn 2' >> apache_beam.ParDo(DoFn2(), count_do_fn_1_output=apache_beam.pvalue.AsSingleton(count_do_fn_1_output)))

0
投票

对于Java SDK,建议您查看Wait转换。 This is an example类似于您想达到的目标。

© www.soinside.com 2019 - 2024. All rights reserved.