Apache Beam Python > 2.38.0 DirectRunner ~ AssertionError: 共有 N 个 watermark-pending bundle 没有执行

问题描述 投票:0回答:0

使用 Python 3.9 和 Apache Beam 2.38.0,下面的最小工作示例工作正常。

但是,当我使用 Apache Beam 2.39.0(或 2.44.0)时,该示例失败并显示错误

AssertionError: A total of 2 watermark-pending bundles did not execute.
。当我将日志记录切换到
DEBUG
时,我看到形式为
Unable to add bundle for stage
的消息以及两个包的
Stage input watermark: Timestamp(-9223372036854.775000)
(即
timestamp.MIN_TIMESTAMP
)和
Bundle schedule watermark: Timestamp(9223372036854.775000)
(即
timestamp.MAX_TIMESTAMP
)。

import logging
import apache_beam as beam


def setup_logging():
    log_format = '[%(asctime)-15s] [%(name)s] [%(levelname)s]: %(message)s'
    logging.basicConfig(format=log_format, level=logging.INFO)
    logging.info("Pipeline Started")


class CreateKvPCollectWithSideInputDoFn(beam.DoFn):
    def __init__(self):
        super().__init__()

    def process(self, element, side_input):
        print(f"side_input_type: {type(side_input)}")
        yield "b", "2"


class CreateKvPCollectDoFn(beam.DoFn):
    def __init__(self):
        super().__init__()

    def process(self, element):
        yield "a", "1"


def main():
    setup_logging()

    pipeline = beam.Pipeline()

    pcollect_input = (
        pipeline
        | "Input/Create" >> beam.Create(["input"])
    )

    kvpcollect_1 = (
        pcollect_input | "PCollection_1" >> beam.ParDo(CreateKvPCollectDoFn())
    )
    beamdict_1 = beam.pvalue.AsDict(kvpcollect_1)

    kvpcollect_2 = (
        pcollect_input
        | "PCollection_2" >> beam.ParDo(
            CreateKvPCollectWithSideInputDoFn(), side_input=beamdict_1
        )
    )

    kvpcollect_3 = (
        (kvpcollect_1, kvpcollect_2)
        | "Flatten" >> beam.Flatten()
    )
    beamdict_3 = beam.pvalue.AsDict(kvpcollect_3)

    (
        pcollect_input
        | "UseBeamDict_3" >> beam.ParDo(CreateKvPCollectWithSideInputDoFn(), side_input=beamdict_3)
        | "PrintResult" >> beam.Map(print)
    )

    result = pipeline.run()
    result.wait_until_finish()


if __name__ == '__main__':
    main()

我想知道为什么这个错误似乎是在高于 2.38.0 的 Apache Beam Python 版本上触发的,以及是否有某种方法可以避免它。

apache-beam python-3.9 direct-runner
© www.soinside.com 2019 - 2024. All rights reserved.