我每分钟都运行一个 DAG,从 API 获取汇率,将其放入 CSV 中并将其上传到 S3 存储桶。
但是,我间歇性地收到此错误:FileNotFoundError:[Errno 2]没有这样的文件或目录:'/usr/local/airflow/exchange/exchange_rates_20240418141900.csv'
奇怪的是,这是间歇性的,通常有效,但有时却无效。可能是什么问题? 另外,如果你清除了任务,它有时会起作用。
DAG代码:
def s3_file_upload(ti):
file_path = ti.xcom_pull(dag_id="exchange_rates_dag", task_ids='transform_json')
file_name = os.path.basename(file_path)
source_session = boto3.Session(
aws_access_key_id=Variable.get('access_key_id'),
aws_secret_access_key=Variable.get('secret_access_key')
)
source_s3 = source_session.resource('s3')
s3_bucket = Variable.get('s3_bucket')
source_bucket = source_s3.Bucket(s3_bucket)
response = source_bucket.upload_file(file_path, 'exchange_rates/{}'.format(file_name))
print(response)
由于多个工作人员之间的 Airflow 任务的分布式性质以及这些工作人员之间文件可用性的潜在不一致,MWAA 中可能会出现间歇性的“文件未找到”错误。
如果 Airflow DAG 中的任务需要访问特定文件,请确保这些文件存储在所有工作人员都可以访问的位置,例如 Amazon S3。请参阅 https://docs.astronomer.io/learn/airflow-passing-data- Between-tasks 了解更多信息。
或者,您可以利用 Airflow 的内置功能,例如 Apache Airflow Providers Amazon 软件包中的 Amazon S3 Transfer Operator。该运算符简化了文件从 HTTP 终端节点到 Amazon S3 存储桶的传输,确保任务之间的无缝数据共享。请参阅 https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/transfer/http_to_s3.html 了解更多信息。