数据流流错误并显示工作流失败错误消息

问题描述 投票:0回答:1

运行我的数据流作业时,它很早就失败了(没有数据处理,似乎没有启动工作程序),并显示一条错误消息:

工作流程失败。

  • 我尝试运行基于自定义弹性模板的流数据流作业。
  • 我用Python开发它,并且能够在单元测试期间以“批处理”模式运行它。
  • 我已经检查了配额,但一切似乎都正确,计算引擎 CPU 只有 22%。
  • 我有另一个使用相同服务帐户运行的预定义数据流作业,所以我认为这不是权限错误。

您知道在哪里调查吗?什么会导致如此突然的故障?

善良。

以下是该作业的完整日志。 首先,一切似乎都开始顺利:

INFO 2023-06-27T10:59:43.625252Z Status: Downloaded newer image for europe-west1-docker.pkg.dev/[...]
INFO 2023-06-27T10:59:45.668036Z Created new fluentd log writer for: /var/log/dataflow/template_launcher/runner-json.log
INFO 2023-06-27T10:59:45.669085Z Started template launcher.
INFO 2023-06-27T10:59:45.669291Z Initialize Python template.
INFO 2023-06-27T10:59:45.669317Z Falling back to using template-container args from metadata: template-container-args
INFO 2023-06-27T10:59:45.673562Z Validating metadata template-container-args: {"consoleLogsLocation":"gs://[...]/staging/template_launches/[...]/console_logs","environment":{"region":"europe-west1","serviceAccountEmail":"***@developer.gserviceaccount.com","stagingLocation":"***/staging","tempLocation":"***/tmp"},"jobId":"[...]","jobName":"***","jobObjectLocation":"***/staging/template_launches/[...]/job_object","operationResultLocation":"***/staging/template_launches/[...]/operation_result","parameters":{"app-log-level":"DEBUG","error":"gs://***/error/?format=text","input":"pubsub:///projects/***/topics/***","log-level":"DEBUG","max_num_workers":"2","metrics":"gsecrets:///***://***/output/?format=parquet","staging_location":"***/staging","temp_location":"***/tmp","window-interval-sec":"1800"},"projectId":"***"}
INFO 2023-06-27T10:59:45.674097Z Extracting operation result location.
INFO 2023-06-27T10:59:45.674131Z Operation result location: ***/staging/template_launches/[...]/operation_result
INFO 2023-06-27T10:59:45.674157Z Extracting console log location.
INFO 2023-06-27T10:59:45.674177Z Console logs location: ***/staging/template_launches/[...]/console_logs
INFO 2023-06-27T10:59:45.674201Z Extracting Python command specs.
INFO 2023-06-27T10:59:45.674896Z Generating launch args.
INFO 2023-06-27T10:59:45.675141Z Overriding staging_location with value: ***/staging (previous value: ***/staging)
INFO 2023-06-27T10:59:45.675170Z Overriding temp_location with value: ***/tmp (previous value: ***/tmp)
INFO 2023-06-27T10:59:45.675204Z Validating ExpectedFeatures.
INFO 2023-06-27T10:59:45.675222Z Launching Python template.
INFO 2023-06-27T10:59:45.675260Z Using launch args: [/template/save_to_parquet.py --requirements_file=/template/requirements.txt --job_name=*** --metrics=gsecrets:///*** --input=pubsub:///projects/***/topics/*** --window-interval-sec=1800 --runner=DataflowRunner --template_location=***/staging/template_launches/[...]/job_object --region=europe-west1 --log-level=DEBUG --output=gs://***/output/?format=parquet --error=gs://***/error/?format=text --project=*** --service_account_email=***@developer.gserviceaccount.com --temp_location=***/tmp --max_num_workers=2 --app-log-level=DEBUG --staging_location=***/staging --num_workers=1]
INFO 2023-06-27T10:59:45.675339Z Executing: python /template/save_to_parquet.py --requirements_file=/template/requirements.txt --job_name=*** --metrics=gsecrets:///*** --input=pubsub:///projects/***/topics/*** --window-interval-sec=1800 --runner=DataflowRunner --template_location=***/staging/template_launches/[...]/job_object --region=europe-west1 --log-level=DEBUG --output=gs://***/output/?format=parquet --error=gs://***/error/?format=text --project=*** --service_account_email=***@developer.gserviceaccount.com --temp_location=***/tmp --max_num_workers=2 --app-log-level=DEBUG --staging_location=***/staging --num_workers=1
INFO 2023-06-27T10:59:48.025088Z /usr/local/lib/python3.10/site-packages/apache_beam/io/fileio.py:593: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
INFO 2023-06-27T10:59:48.025219Z p.options.view_as(GoogleCloudOptions).temp_location or
INFO 2023-06-27T10:59:48.027368Z INFO:apache_beam.io.fileio:Added temporary directory ***/tmp/.temp2d4e5f71-bb17-4ad1-a501-0b6ea8838124
INFO 2023-06-27T10:59:49.167115Z INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/local/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', '/tmp/tmp3kt4l_pk/tmp_requirements.txt', '--exists-action', 'i', '--no-deps', '--implementation', 'cp', '--abi', 'cp310', '--platform', 'manylinux2014_x86_64']
INFO 2023-06-27T10:59:54.127463Z INFO:apache_beam.runners.portability.stager:Downloading source distribution of the SDK from PyPi
INFO 2023-06-27T10:59:54.127739Z INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/local/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/tmpr07g31e_', 'apache-beam==2.48.0', '--no-deps', '--no-binary', ':all:']
INFO 2023-06-27T11:00:14.273559Z [notice] A new release of pip is available: 23.0.1 -> 23.1.2
INFO 2023-06-27T11:00:14.273666Z [notice] To update, run: pip install --upgrade pip
INFO 2023-06-27T11:00:14.542496Z INFO:apache_beam.runners.portability.stager:Staging SDK sources from PyPI: dataflow_python_sdk.tar
INFO 2023-06-27T11:00:14.543974Z INFO:apache_beam.runners.portability.stager:Downloading binary distribution of the SDK from PyPi
INFO 2023-06-27T11:00:14.544215Z INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/local/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/tmpr07g31e_', 'apache-beam==2.48.0', '--no-deps', '--only-binary', ':all:', '--python-version', '310', '--implementation', 'cp', '--abi', 'cp310', '--platform', 'manylinux2014_x86_64']
INFO 2023-06-27T11:00:15.867609Z [notice] A new release of pip is available: 23.0.1 -> 23.1.2
INFO 2023-06-27T11:00:15.867700Z [notice] To update, run: pip install --upgrade pip
INFO 2023-06-27T11:00:16.093499Z INFO:apache_beam.runners.portability.stager:Staging binary distribution of the SDK from PyPI: apache_beam-2.48.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
INFO 2023-06-27T11:00:16.202858Z INFO:apache_beam.runners.dataflow.dataflow_runner:Pipeline has additional dependencies to be installed in SDK worker container, consider using the SDK container image pre-building workflow to avoid repetitive installations. Learn more on https://cloud.google.com/dataflow/docs/guides/using-custom-containers#prebuild
INFO 2023-06-27T11:00:16.212242Z INFO:root:Default Python SDK image for environment is apache/beam_python3.10_sdk:2.48.0
INFO 2023-06-27T11:00:16.212622Z INFO:root:Using provided Python SDK container image: gcr.io/cloud-dataflow/v1beta3/beam_python3.10_sdk:2.48.0
INFO 2023-06-27T11:00:16.212759Z INFO:root:Python SDK container image set to "gcr.io/cloud-dataflow/v1beta3/beam_python3.10_sdk:2.48.0" for Docker environment
INFO 2023-06-27T11:00:16.781664Z INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO 2023-06-27T11:00:16.782239Z INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO 2023-06-27T11:00:16.805635Z INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to ***/staging/***.1687863616.380928/requirements.txt...
INFO 2023-06-27T11:00:16.926113Z INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to ***/staging/***.1687863616.380928/requirements.txt in 0 seconds.
INFO 2023-06-27T11:00:16.926750Z INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to ***/staging/***.1687863616.380928/pickled_main_session...
INFO 2023-06-27T11:00:17.024959Z INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to ***/staging/***.1687863616.380928/pickled_main_session in 0 seconds.
INFO 2023-06-27T11:00:17.025549Z INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to ***/staging/***.1687863616.380928/dataflow_python_sdk.tar...
INFO 2023-06-27T11:00:17.265944Z INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to ***/staging/***.1687863616.380928/dataflow_python_sdk.tar in 0 seconds.
INFO 2023-06-27T11:00:17.266521Z INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to ***/staging/***.1687863616.380928/apache_beam-2.48.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl...
INFO 2023-06-27T11:00:18.169724Z INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to ***/staging/***.1687863616.380928/apache_beam-2.48.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl in 0 seconds.
INFO 2023-06-27T11:00:18.170524Z INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to ***/staging/***.1687863616.380928/pipeline.pb...
INFO 2023-06-27T11:00:18.238244Z INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to ***/staging/***.1687863616.380928/pipeline.pb in 0 seconds.
INFO 2023-06-27T11:00:18.745066Z INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://***/staging/template_launches/***/job_object...
INFO 2023-06-27T11:00:18.912294Z INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://***/staging/template_launches/***/job_object in 0 seconds.
INFO 2023-06-27T11:00:18.912678Z INFO:apache_beam.runners.dataflow.internal.apiclient:A template was just created at location gs://***/staging/template_launches/***/job_object
INFO 2023-06-27T11:00:19.627536Z Template launch successful.
INFO 2023-06-27T11:00:19.627637Z Uploading console logs to GCS location: gs://***/staging/template_launches/***/console_logs
INFO 2023-06-27T11:00:19.710081Z Successfully uploaded console logs to GCS.
INFO 2023-06-27T11:00:19.710187Z FLEX_TEMPLATES_TAIL_CMD_TIMEOUT_IN_SECS to fetch is not set in env. Using default value: 5
INFO 2023-06-27T11:00:19.710220Z FLEX_TEMPLATES_NUM_LOG_LINES to fetch is not set in env. Using default value: 100
INFO 2023-06-27T11:00:24.712683Z Uploading result file to GCS location: gs://***/staging/template_launches/***/operation_result
INFO 2023-06-27T11:00:25.148819Z py options not set in envsetup file not set in envextra package not set in env
INFO 2023-06-27T11:00:25.242819Z Credentials not provided. Not logging out of Docker private registry.
INFO 2023-06-27T11:00:34.833338709Z Shutting down the Sandbox, launcher-2023062703574013425371552899353440 of type GCE Instance, used for launching.
INFO 2023-06-27T11:00:38.466634757Z Worker configuration: n1-standard-2 in europe-west1-d.

然后从梁进行一些调试,尝试优化我的工作流程:

DEBUG 2023-06-27T11:00:40.754549761Z Expanding SplittableParDo operations into optimizable parts.
DEBUG 2023-06-27T11:00:40.791375973Z Expanding CollectionToSingleton operations into optimizable parts.
DEBUG 2023-06-27T11:00:40.861882114Z Expanding CoGroupByKey operations into optimizable parts.
DEBUG 2023-06-27T11:00:40.890661436Z Combiner lifting skipped for step [...]: GroupByKey not followed by a combiner.
[...]
DEBUG 2023-06-27T11:00:41.256597037Z Expanding SplittableProcessKeyed operations into optimizable parts.
DEBUG 2023-06-27T11:00:41.290339380Z Expanding GroupByKey operations into streaming Read/Write steps
DEBUG 2023-06-27T11:00:41.614488065Z Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
DEBUG 2023-06-27T11:00:41.969795862Z Annotating graph with Autotuner information.
DEBUG 2023-06-27T11:00:42.025294160Z Fusing adjacent ParDo, Read, Write, and Flatten operations
DEBUG 2023-06-27T11:00:42.063715064Z Unzipping flatten ref_AppliedPTransform_Save-data-to-gs-***-output-format-parquet-W_34 for input ref_AppliedPTransform_Save-data-to-gs-***-output-format-parquet-W_30.written_files
DEBUG 2023-06-27T11:00:42.089804965Z Fusing unzipped copy of Save data to gs://***/output/?format=parquet/WriteToPandas/Map(<lambda at fileio.py:627>), through flatten Save data to gs://***/output/?format=parquet/WriteToPandas/Flatten, into producer Save data to gs://***/output/?format=parquet/WriteToPandas/ParDo(_WriteUnshardedRecordsFn)/ParDo(_WriteUnshardedRecordsFn)
DEBUG 2023-06-27T11:00:42.123688209Z Unzipping flatten ref_AppliedPTransform_Save-data-to-gs-***-output-format-parquet-W_34-u123 for input ref_AppliedPTransform_Save-data-to-gs-***-output-format-parquet-W_35.None-c121
DEBUG 2023-06-27T11:00:42.155708287Z Fusing unzipped copy of Save data to gs://***/output/?format=parquet/WriteToPandas/GroupTempFilesByDestination/WriteStream, through flatten Save data to gs://***/output/?format=parquet/WriteToPandas/Flatten/Unzipped-1, into producer Save data to gs://***/output/?format=parquet/WriteToPandas/Map(<lambda at fileio.py:627>)
DEBUG 2023-06-27T11:00:42.188468945Z Fusing consumer [...]
[...]

最后出现错误:

DEBUG 2023-06-27T11:00:44.262397762Z Workflow config is missing a default resource spec.
DEBUG 2023-06-27T11:00:44.294972535Z Adding StepResource setup and teardown to workflow graph.
INFO 2023-06-27T11:00:44.331156772Z Running job using Streaming Engine
ERROR 2023-06-27T11:00:44.365334445Z Workflow failed.
DEBUG 2023-06-27T11:00:44.397489706Z Cleaning up.
INFO 2023-06-27T11:00:44.447547675Z Worker pool stopped.
INFO 2023-06-27T11:00:59.262390276Z Sandbox, launcher-2023062703574013425371552899353440, stopped.
python google-cloud-dataflow apache-beam
1个回答
0
投票

抱歉写在这里,但无法发表评论。我也遇到了同样的问题,请问你解决了吗?

© www.soinside.com 2019 - 2024. All rights reserved.