如何在 Apache Beam (Python SDK) 中使用 `with_outputs` 和 `with_output_types`?

问题描述 投票:0回答:1

Apache Beam

PTransform
可以附加
with_outputs
with_output_types
。例如,

pcoll | beam.CombinePerKey(sum).with_output_types(typing.Tuple[unicode, int])

(words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
             .with_outputs('above_cutoff_lengths', 'marked strings',
                           main='below_cutoff_strings')
)

(如果您需要一些上下文,这两个示例均取自 Apache Beam 文档。)

但我似乎找不到任何关于如何组合它们的文档。例如,我可以做这样的事情吗

(words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
             .with_outputs('above_cutoff_lengths', 'marked strings',
                           main='below_cutoff_strings')
             .with_output_types(str, IndexError, str)
)
python apache-beam
1个回答
1
投票

免责声明:我可能是错的,因为您没有描述实际的问题/错误。此外,

DirectRunner
(在操场上使用)完全忽略任何类型提示!为了验证这确实解决了您的问题,必须在实际考虑类型提示的运行程序中执行(例如,
Dataflow
)。

假设您遇到的错误是

TypeError:PTransform.with_output_types() 需要 2 个位置参数 但给了4个

如果您查看 with_output_types

documentation
那么它只需要一个类型提示。但是,您提供了 3 个单独的。您需要将类型提示包装成
Tuple
,例如

import apache_beam as beam
from typing import Tuple  # <- this is the important piece

class DoFnWithOutputs(beam.DoFn):
  def process(self, element):
    if element == 1:
      yield "one"
    else:
      yield beam.pvalue.TaggedOutput("not_one", False)


with beam.Pipeline() as pipeline:
  input_data = pipeline | 'Create data' >> beam.Create([1, 2, 3, 1])

  being_one, not_being_one = (
    input_data
    | "DoFn" >> beam.ParDo(DoFnWithOutputs())
                .with_outputs("not_one", main="one")
                .with_output_types(Tuple[bool, str])   # Note the wrapping within 'Tuple'
  )
  
  (
    being_one
    | "print 1" >> beam.Map(print)
  )

  (
    not_being_one
    | "print != 1" >> beam.Map(print)
  )
  

您可以直接在 Beam Playground 中执行。

© www.soinside.com 2019 - 2024. All rights reserved.