Apache Beam
PTransform
可以附加 with_outputs
和 with_output_types
。例如,
pcoll | beam.CombinePerKey(sum).with_output_types(typing.Tuple[unicode, int])
和
(words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
.with_outputs('above_cutoff_lengths', 'marked strings',
main='below_cutoff_strings')
)
(如果您需要一些上下文,这两个示例均取自 Apache Beam 文档。)
但我似乎找不到任何关于如何组合它们的文档。例如,我可以做这样的事情吗?
(words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
.with_outputs('above_cutoff_lengths', 'marked strings',
main='below_cutoff_strings')
.with_output_types(str, IndexError, str)
)
免责声明:我可能是错的,因为您没有描述实际的问题/错误。此外,
DirectRunner
(在操场上使用)完全忽略任何类型提示!为了验证这确实解决了您的问题,必须在实际考虑类型提示的运行程序中执行(例如,Dataflow
)。
假设您遇到的错误是
TypeError:PTransform.with_output_types() 需要 2 个位置参数 但给了4个
如果您查看 with_output_types
的
documentation那么它只需要一个类型提示。但是,您提供了 3 个单独的。您需要将类型提示包装成
Tuple
,例如
import apache_beam as beam
from typing import Tuple # <- this is the important piece
class DoFnWithOutputs(beam.DoFn):
def process(self, element):
if element == 1:
yield "one"
else:
yield beam.pvalue.TaggedOutput("not_one", False)
with beam.Pipeline() as pipeline:
input_data = pipeline | 'Create data' >> beam.Create([1, 2, 3, 1])
being_one, not_being_one = (
input_data
| "DoFn" >> beam.ParDo(DoFnWithOutputs())
.with_outputs("not_one", main="one")
.with_output_types(Tuple[bool, str]) # Note the wrapping within 'Tuple'
)
(
being_one
| "print 1" >> beam.Map(print)
)
(
not_being_one
| "print != 1" >> beam.Map(print)
)
您可以直接在 Beam Playground 中执行。