数据流工作线程中无法访问环境变量

问题描述 投票:0回答:1

有一个使用 Apache Beam 和 Dataflow 作为运行器的 Python 应用程序。该应用程序使用非公共 Python 包“uplight-telemetry”,该包在创建 pipeline_options 对象时使用“extra_packages”进行配置。该包需要一个名为“OTEL_SERVICE_NAME”的环境变量,并且由于 Dataflow Worker 中不存在该变量,因此会在应用程序启动期间导致错误。

我使用自定义管道选项传递此变量。创建管道选项的代码如下-

pipeline_options = ProcessBillRequests.CustomOptions(
    project=gcp_project_id,
    region="us-east1",
    job_name=job_name,
    temp_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
    staging_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
    runner='DataflowRunner',
    save_main_session=True,
    service_account_email= service_account,
    subnetwork=os.environ.get(SUBNETWORK_URL),
    extra_packages=[uplight_telemetry_tar_file_path],
    setup_file=setup_file_path,
    OTEL_SERVICE_NAME=otel_service_name,
    OTEL_RESOURCE_ATTRIBUTES=otel_resource_attributes
    # Set values for additional custom variables as needed

而执行管道的代码如下-

result = (
        pipeline
        | "ReadPendingRecordsFromDB" >> read_from_db
        | "Parse input PCollection" >> beam.Map(ProcessBillRequests.parse_bill_data_requests)        
        | "Fetch bills " >> beam.ParDo(ProcessBillRequests.FetchBillInformation())
)
pipeline.run().wait_until_finish()

有没有办法可以在工作人员中可用的自定义选项中设置环境变量?

python google-cloud-platform google-cloud-dataflow apache-beam
1个回答
0
投票

为什么首先使用环境变量?如果我需要在管道启动期间告诉 Dataflow 一些变量,我会使用

argparse

只需将相应的参数添加到您的

main.py
,将参数传递到您的管道并在代码中任何需要的地方访问它们。 让我简单地引用官方的 Beam git 存储库(参见here),而不是编写 MWE。在那里您可能会找到许多其他示例。

© www.soinside.com 2019 - 2024. All rights reserved.