我想使用 Apache Beam 将 Transform 应用于侧面输入 PCollection。 应该对基本 PCollection 的每个元素执行侧面输入的转换,并且从相应的元素读取转换的详细信息。 它在某种程度上有效,但它会针对 base_data PCollection 的每个元素触发以下警告:
WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['test.py']
(test.py是我的Python脚本的名称)
更重要的是,在变换内的侧输入 PCollection 上应用变换会导致性能大幅下降。
这是触发此行为的最小示例:
# Using Python 3.10.9 and Apache Beam 2.44.0
import apache_beam as beam
class Test(beam.DoFn):
def process(self, element, side_input):
# This is the PTransform
side_input | beam.Filter(lambda _: True)
yield element
with beam.Pipeline() as pipeline:
base_data = pipeline | 'Create data' >> beam.Create([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
side_input = pipeline | 'Create test data' >> beam.Create([1, 2])
output = (
base_data
| beam.ParDo(Test(), side_input=beam.pvalue.AsIter(side_input))
| beam.Map(print)
)
在此示例中,Test 类中的转换不执行任何操作,但它仍然会触发相关行为。 这个示例已经花费了一秒多的时间来运行,而如果没有这个(非常简单的)侧输入转换,它会立即完成。我正在处理的实际管道当然更复杂,并且需要很长时间(>1 分钟)才能完成,即使应用的转换非常简单并且应用于仅包含 39 个元素的 PCollection。
我想知道在变换内的侧面输入上应用变换是否根本不是您应该做的事情,或者我是否只是做得不正确。
谢谢!
您混淆了 DoFn 与 PTransform 并在代码中混合了这两个概念。
A
DoFn
正在为每个传入元素执行其 process()
函数,这是您应该在单个元素级别提供处理逻辑的地方。这个process
可能会执行多次。在您的情况下,它正在执行多次(并且很可能不必要地经常)被剪断的管道。
这样的一段代码应该进入实际的
PTransform
,特别是它的 expand()
方法。您可能会将 PTransform 视为“管道函数/方法”,您可以使用它来重用管道片段并格式化管道代码。
这里是一个使用两种方式实现简单过滤的例子
import apache_beam as beam
def my_filter_using_side_input(element, side_input):
return element in side_input
# This is a PTransform with a side input
class SideInputPTransform(beam.PTransform):
def __init__(self, side_input):
self.side_input = side_input
def expand(self, pcoll):
# note the return
return (
pcoll
| beam.Filter(my_filter_using_side_input, self.side_input)
)
# This is a DoFn with a side input
class SideInputDoFn(beam.DoFn):
# note the yield
def process(self, element, side_input):
if my_filter_using_side_input(element, side_input):
yield element
with beam.Pipeline() as pipeline:
base_data = pipeline | 'Create data' >> beam.Create([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
side_input = pipeline | 'Create test data' >> beam.Create([1, 2])
# use side_input with DoFn
(
base_data
| "DoFn" >> beam.ParDo(SideInputDoFn(), side_input=beam.pvalue.AsIter(side_input))
| "Print DoFn" >> beam.Map(print)
)
# use side_input with PTransform
(
base_data
| "PTransform" >> SideInputPTransform(side_input=beam.pvalue.AsIter(side_input))
| "Print PTransform" >> beam.Map(print)
)
如果从外部服务动态获取侧面输入,此模式是否有效?例如,使用PeriodicImpulse ... 侧面输入不会在整个捆绑持续时间内更新,对吗?