有一个使用 Apache Beam 和 Dataflow 作为运行器的 Python 应用程序。该应用程序使用非公共 Python 包“uplight-telemetry”,该包在创建 pipeline_options 对象时使用“extra_packages”进行配置。该包需要一个名为“OTEL_SERVICE_NAME”的环境变量,并且由于 Dataflow Worker 中不存在该变量,因此会在应用程序启动期间导致错误。
我使用自定义管道选项传递此变量。创建管道选项的代码如下-
pipeline_options = ProcessBillRequests.CustomOptions(
project=gcp_project_id,
region="us-east1",
job_name=job_name,
temp_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
staging_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
runner='DataflowRunner',
save_main_session=True,
service_account_email= service_account,
subnetwork=os.environ.get(SUBNETWORK_URL),
extra_packages=[uplight_telemetry_tar_file_path],
setup_file=setup_file_path,
OTEL_SERVICE_NAME=otel_service_name,
OTEL_RESOURCE_ATTRIBUTES=otel_resource_attributes
# Set values for additional custom variables as needed
而执行管道的代码如下-
result = (
pipeline
| "ReadPendingRecordsFromDB" >> read_from_db
| "Parse input PCollection" >> beam.Map(ProcessBillRequests.parse_bill_data_requests)
| "Fetch bills " >> beam.ParDo(ProcessBillRequests.FetchBillInformation())
)
pipeline.run().wait_until_finish()
有没有办法可以在工作人员中可用的自定义选项中设置环境变量?
为什么首先使用环境变量?如果我需要在管道启动期间告诉 Dataflow 一些变量,我会使用
argparse
。
只需将相应的参数添加到您的
main.py
,将参数传递到您的管道并在代码中任何需要的地方访问它们。
让我简单地引用官方的 Beam git 存储库(参见here),而不是编写 MWE。在那里您可能会找到许多其他示例。