Airflow GoogleCloudStorageToGoogleCloudStorageOperator 错误

问题描述 投票:0回答:1

已经两周了,我的 DAG 中遇到了一个奇怪的问题。我的用例如下:一位同事手动将文件上传到 GCS 存储桶中。这会触发一个云函数,该函数又会从 API 启动 Airflow DAG。 DAG 的第一个任务是将文件从“着陆区”传输到“保存区”,然后 DAG 的其余部分继续。

我使用 GoogleCloudStorageToGoogleCloudStorageOperator 将文件从存储桶 A 移动到 B。 直到两三周前一切都正常。最旧的 DAG 已有 6 个月的历史,即使我们更改了一些内容,它也位于 DAG 的另一部分。所以我们从来没有碰过那部分,也很长一段时间没有遇到任何问题。

现在,大多数时候第一个任务(传输)都会失败。该文件移动得很好,但由于未知的原因,如果我重试,我会连续出现此错误 2 或 3 次,并且下一次尝试使用完全相同的文件......它正在工作。我找不到给我带来这个问题的因素。我要疯了。

--------------------------------------------------------------------------------
[2020-10-15 09:34:50,599] {taskinstance.py:867} INFO - Starting attempt 1 of 1
[2020-10-15 09:34:50,599] {taskinstance.py:868} INFO - 
--------------------------------------------------------------------------------
[2020-10-15 09:34:50,620] {taskinstance.py:887} INFO - Executing <Task(GoogleCloudStorageToGoogleCloudStorageOperator): transfer-landing-to-safe> on 2020-10-15T07:34:40+00:00
[2020-10-15 09:34:50,626] {standard_task_runner.py:53} INFO - Started process 31555 to run task
[2020-10-15 09:34:50,775] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: geometrie-preprocessing.transfer-landing-to-safe 2020-10-15T07:34:40+00:00 [running]> blablabla.internal
[2020-10-15 09:34:50,860] {gcs_to_gcs.py:193} INFO - Executing copy of gs://blablabla-landing/geometrie/Track_Geometry-20201005_032915.csv to gs://blablabla-safe/geometrie/original/track_geometry_20201005_032915.csv
[2020-10-15 09:34:50,861] {logging_mixin.py:112} INFO - [2020-10-15 09:34:50,860] {gcp_api_base_hook.py:146} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2020-10-15 09:34:50,980] {taskinstance.py:1128} ERROR - 404 POST https://storage.googleapis.com/storage/v1/b/blablabla-landing/o/geometrie%2FTrack_Geometry-20201005_032915.csv/rewriteTo/b/blablabla-safe/o/geometrie%2Foriginal%2Ftrack_geometry_20201005_032915.csv: No such object: blablabla-landing/geometrie/Track_Geometry-20201005_032915.csv
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/dist-packages/airflow/contrib/operators/gcs_to_gcs.py", line 178, in execute
    destination_object=self.destination_object)
  File "/usr/local/lib/python3.7/dist-packages/airflow/contrib/operators/gcs_to_gcs.py", line 196, in _copy_single_object
    self.destination_bucket, destination_object)
  File "/usr/local/lib/python3.7/dist-packages/airflow/contrib/hooks/gcs_hook.py", line 135, in rewrite
    source=source_object
  File "/usr/local/lib/python3.7/dist-packages/google/cloud/storage/blob.py", line 2098, in rewrite
    timeout=timeout,
  File "/usr/local/lib/python3.7/dist-packages/google/cloud/_http.py", line 423, in api_request
    raise exceptions.from_http_response(response)
google.api_core.exceptions.NotFound: 404 POST https://storage.googleapis.com/storage/v1/b/blablabla-landing/o/geometrie%2FTrack_Geometry-20201005_032915.csv/rewriteTo/b/blablabla-safe/o/geometrie%2Foriginal%2Ftrack_geometry_20201005_032915.csv: No such object: blablabla-landing/geometrie/Track_Geometry-20201005_032915.csv
[2020-10-15 09:34:50,984] {taskinstance.py:1185} INFO - Marking task as FAILED.dag_id=geometrie-preprocessing, task_id=transfer-landing-to-safe, execution_date=20201015T073440, start_date=20201015T073450, end_date=20201015T073450
[2020-10-15 09:35:00,556] {logging_mixin.py:112} INFO - [2020-10-15 09:35:00,556] {local_task_job.py:103} INFO - Task exited with return code 1

操作部分:

transfer_landing_to_safe = GoogleCloudStorageToGoogleCloudStorageOperator(
        task_id=f"transfer-landing-to-safe{env_extension}",
        source_bucket=f"blablabla-landing{env_extension}",
        source_object="{{ dag_run.conf['file_name'] }}",
        destination_bucket=f"blablabla-safe{env_extension}",
        destination_object="geometrie/original/track_geometry_{{ dag_run.conf['file_name'][-19:] }}",
        move_object=True,
        google_cloud_storage_conn_id="gcp_conn"
    )
python google-cloud-storage airflow
1个回答
0
投票

我不是在回答这个问题,但在新的 Airflow 版本中,

GoogleCloudStorageToGoogleCloudStorageOperator
已被弃用,取而代之的是
GCSToGCSOperator
:

from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator

https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/transfers/gcs_to_gcs/index.html#airflow.providers.google.cloud。传输.gcs_to_gcs.GCSToGCSOperator

希望对其他需要升级Airflow版本的用户有所帮助。

© www.soinside.com 2019 - 2024. All rights reserved.