我正在努力让我的
BeamRunJavaPipeline()
在Airflow中工作以在GCP上运行数据流作业。
我已经在 Google Cloud Storage 中拥有了 jar 文件。
我基本上是在寻找如何启动和运行该管道的指南和实际示例。
查看所需的管道选项:
https://cloud.google.com/dataflow/docs/guides/setting-pipeline-options#set_required_options
查看可能的管道选项也很好:
https://cloud.google.com/dataflow/docs/reference/pipeline-options#java
这是一个代码示例:
your_java_task = BeamRunJavaPipelineOperator(
task_id="your_java_task",
jar="gs://gcs_location_where_your_jar_is_stored",
runner="DataflowRunner",
job_class="job_class_that_has_your_main_function",
pipeline_options={
"jobName": "your_java_task",
# following options are required: https://cloud.google.com/dataflow/docs/guides/setting-pipeline-options#set_required_options
"project": "your_project_id",
"tempLocation": "some_gcs_temp_location",
"stagingLocation": "some_gcs_staging_location",
"network": "your_network_name", # you might not have to set this, but larger orgs might require this
"subnetwork": "https://www.googleapis.com/compute/v1/projects/your_project_id/regions/your_region/subnetworks/your_subnetwork_name", # you might not have to set this, but larger orgs might require this
# additional options, also required: https://cloud.google.com/dataflow/docs/reference/pipeline-options
"serviceAccount": "email_address_of_your_service_account",
"usePublicIps": False, # otherwise dataflow will try to use external IP's, your org might not like this
"numWorkers": 1, # or higher, depending on what's needed
},
# setting pipeline_options["region"] = "europe-west4" results in Airflow thinking this task failing, so
# having to set it in the dataflow_config.
dataflow_config={
"location": "europe-west4",
},
)