我想下载S3文件夹中最后创建的文件。
S3 路径示例:
my_Bucket/folder_1/folder_2/folder_3/folder_4/str_str2_2021_03_str3.csv
my_Bucket/folder_1/folder_2/folder_3/folder_4/str_str2_2023_04_str3.csv
my_Bucket/folder_1/folder_2/folder_3/folder_4/str_str2_2022_05_str3.csv
my_Bucket/folder_1/folder_2/folder_3/folder_4/str_str2_2021_05_str3.csv
我应该下载最后创建的文件,例如从此列表中下载文件:
str_str2_2023_04_str3.csv
。
我创建了一个方法
get_file_folders()
来获取 folder_4
中的所有文件:
str_str2_2021_03_str3.csv
str_str2_2023_04_str3.csv
str_str2_2021_03_str3.csv
str_str2_2021_05_str3.csv
session_root = boto3.Session(region_name='eu-west-3', profile_name='my_profile')
s3_client = session_root.client('s3')
def get_file_folders(s3_client, bucket_name, prefix=""):
file_names = []
folders = []
file_name_child = []
biggest_number = []
default_kwargs = {
"Bucket": bucket_name,
"Prefix": prefix
}
next_token = ""
while next_token is not None:
updated_kwargs = default_kwargs.copy()
if next_token != "":
updated_kwargs["ContinuationToken"] = next_token
response = s3_client.list_objects_v2(**default_kwargs)
contents = response.get("Contents")
for result in contents:
key = result.get("Key")
if key[-1] == "/":
folders.append(key)
else:
file_names.append(key)
next_token = response.get("NextContinuationToken")
for file in file_names:
file_name_child = file.split("/")[4]
print(file_name_child)
#for file in file_name_child:
return file_names, folders
我正在寻找下载方法,将其修改为仅下载最后创建的文件:
def download_files(s3_client, bucket_name, local_path, file_names, folders):
local_path = Path(local_path)
for folder in folders:
folder_path = Path.joinpath(local_path, folder)
folder_path.mkdir(parents=True, exist_ok=True)
for file_name in file_names:
file_path = Path.joinpath(local_path, file_name)
file_path.parent.mkdir(parents=True, exist_ok=True)
s3_client.download_file(
bucket_name,
file_name,
str(file_path)
)
您知道如何修改我的下载方法以下载最后创建的文件吗?或者您有其他解决方案吗?
谢谢你
如果您想确定哪个对象具有最新的
LastModified
日期,而不是从对象的密钥(文件名)中提取日期,那么您可以使用:
import boto3
BUCKET = 'bucket-name-here'
session = boto3.Session(profile_name='dev')
s3_resource = session.resource('s3')
latest = None
for object in s3_resource.Bucket(BUCKET).objects.all():
if (latest is None or object.last_modified < latest) and not object.key.endswith('/'):
latest = object.last_modified
latest_key = object.key
print(latest_key)
target_filename = latest_key.split('/')[-1] # Assume you just want the filename portion
s3_resource.Object(BUCKET, latest_key).download_file(target_filename)