如何在传递给 pardo 时修改数据流运行时值提供程序参数?

问题描述 投票:0回答:1

我在尝试修改传递给 Apache Beam Dataflow 管道中的

RuntimeValueProvider
的参数时遇到问题。这是我的代码的简化版本:

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import (GoogleCloudOptions,
                                                  PipelineOptions)


class DataflowFlags(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument(
        "--output_path",
        dest="output_path",
        type=str,
        default="./python_extract_output",
    )
    parser.add_argument(
        "--project_id", dest="project_id", default="test"
    )


class ExtractPipelineRunner:

  def __init__(
      self,
      output_path: str,
  ):
    self.output_path = output_path

  def run(self, p: beam.Pipeline) -> None:
    _ = (
        p
        | "Create" >> beam.Create(["hello", "world"])
        | "WriteToText" >> WriteToText(self.output_path.get() + "test/")
    )


def main() -> None:
  pipeline_options = PipelineOptions()
  known_args = pipeline_options.view_as(DataflowFlags)
  pipeline_options.view_as(GoogleCloudOptions).project = known_args.project_id

  with beam.Pipeline(options=pipeline_options) as p:
    extract_runner = ExtractPipelineRunner(known_args.output_path)
    extract_runner.run(p)


if __name__ == "__main__":
  main()

我试图通过在将它传递给

WriteToText
之前附加一个额外的值来修改output_path,但我遇到以下错误:

apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: output_path, type: str, default_value: './python_extract_output').get() not called from a runtime context

如何解决这个问题?我希望能够适当修改

RuntimeValueProvider
参数。谢谢您的帮助!

我想每月动态触发一个数据流模板,并将当前月份(即代替“test/”)作为字符串附加到输出路径。我尝试通过使用 StaticValueProvider 来实现此目的,但它并不是每月更新一次。

python google-cloud-dataflow apache-beam
1个回答
0
投票

您是否考虑过使用

NestedValueProvider
? ([文档][1]、[Python 实现][2])。

这应该允许您使用原始的

ValueProvider
并使用翻译功能修改它以包括月份。

[1]:https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#use-nestedstaticvalueprovider

output_path 
并提供 [2]:https://github.com/apache/beam/blob/37609ba70fab2216edc338121bf2f3a056a1e490/sdks/python/apache_beam/options/value_provider.py#L141

© www.soinside.com 2019 - 2024. All rights reserved.