MWAA 中间歇性找不到文件错误

问题描述 投票:0回答:1

我每分钟都运行一个 DAG,从 API 获取汇率,将其放入 CSV 中并将其上传到 S3 存储桶。

但是,我间歇性地收到此错误:FileNotFoundError:[Errno 2]没有这样的文件或目录:'/usr/local/airflow/exchange/exchange_rates_20240418141900.csv'

奇怪的是,这是间歇性的,通常有效,但有时却无效。可能是什么问题? 另外,如果你清除了任务,它有时会起作用。

DAG代码:

def s3_file_upload(ti):
    file_path = ti.xcom_pull(dag_id="exchange_rates_dag", task_ids='transform_json')
    file_name = os.path.basename(file_path)
    source_session = boto3.Session(
        aws_access_key_id=Variable.get('access_key_id'),
        aws_secret_access_key=Variable.get('secret_access_key')
    )

    source_s3 = source_session.resource('s3')

    s3_bucket = Variable.get('s3_bucket')
    source_bucket = source_s3.Bucket(s3_bucket)
    response = source_bucket.upload_file(file_path, 'exchange_rates/{}'.format(file_name))
    print(response) 
airflow mwaa
1个回答
0
投票

由于多个工作人员之间的 Airflow 任务的分布式性质以及这些工作人员之间文件可用性的潜在不一致,MWAA 中可能会出现间歇性的“文件未找到”错误。

如果 Airflow DAG 中的任务需要访问特定文件,请确保这些文件存储在所有工作人员都可以访问的位置,例如 Amazon S3。请参阅 https://docs.astronomer.io/learn/airflow-passing-data- Between-tasks 了解更多信息。

或者,您可以利用 Airflow 的内置功能,例如 Apache Airflow Providers Amazon 软件包中的 Amazon S3 Transfer Operator。该运算符简化了文件从 HTTP 终端节点到 Amazon S3 存储桶的传输,确保任务之间的无缝数据共享。请参阅 https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/transfer/http_to_s3.html 了解更多信息。

© www.soinside.com 2019 - 2024. All rights reserved.