我有一个包含一堆子文件夹和文件的文件夹,我从服务器获取这些子文件夹和文件并将其分配给变量。文件夹结构如下:
└── main_folder
├── folder
│ ├── folder
│ │ ├── folder
│ │ │ └── a.json
│ │ ├── folder
│ │ │ ├── folder
│ │ │ │ └── b.json
│ │ │ ├── folder
│ │ │ │ └── c.json
│ │ │ └── folder
│ │ │ └── d.json
│ │ └── folder
│ │ └── e.json
│ ├── folder
│ │ └── f.json
│ └── folder
│ └── i.json
现在我想使用 boto3 将此 main_folder 上传到具有相同结构的 S3 存储桶。在 boto3 中,无法在 s3 上上传文件夹。
我已经在此链接上看到了解决方案,但他们从本地计算机获取文件,而我从服务器获取数据并分配给变量。
将充满文件的文件夹上传到 Amazon S3 中的特定文件夹
https://gist.github.com/feelinc/d1f541af4f31d09a2ec3
有人遇到过同样的问题吗?
下面是适合我的代码,纯 python3。
""" upload one directory from the current working directory to aws """
from pathlib import Path
import os
import glob
import boto3
def upload_dir(localDir, awsInitDir, bucketName, tag, prefix='/'):
"""
from current working directory, upload a 'localDir' with all its subcontents (files and subdirectories...)
to a aws bucket
Parameters
----------
localDir : localDirectory to be uploaded, with respect to current working directory
awsInitDir : prefix 'directory' in aws
bucketName : bucket in aws
tag : tag to select files, like *png
NOTE: if you use tag it must be given like --tag '*txt', in some quotation marks... for argparse
prefix : to remove initial '/' from file names
Returns
-------
None
"""
s3 = boto3.resource('s3')
cwd = str(Path.cwd())
p = Path(os.path.join(Path.cwd(), localDir))
mydirs = list(p.glob('**'))
for mydir in mydirs:
fileNames = glob.glob(os.path.join(mydir, tag))
fileNames = [f for f in fileNames if not Path(f).is_dir()]
rows = len(fileNames)
for i, fileName in enumerate(fileNames):
fileName = str(fileName).replace(cwd, '')
if fileName.startswith(prefix): # only modify the text if it starts with the prefix
fileName = fileName.replace(prefix, "", 1) # remove one instance of prefix
print(f"fileName {fileName}")
awsPath = os.path.join(awsInitDir, str(fileName))
s3.meta.client.upload_file(fileName, bucketName, awsPath)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--localDir", help="which dir to upload to aws")
parser.add_argument("--bucketName", help="to which bucket to upload in aws")
parser.add_argument("--awsInitDir", help="to which 'directory' in aws")
parser.add_argument("--tag", help="some tag to select files, like *png", default='*')
args = parser.parse_args()
# cd whatever is above your dir, then run it
# (below assuming this script is in ~/git/hu-libraries/netRoutines/uploadDir2Aws.py )
# in the example below you have directory structure ~/Downloads/IO
# you copy full directory of ~/Downloads/IO to aws bucket markus1 to 'directory' 2020/IO
# NOTE: if you use tag it must be given like --tag '*txt', in some quotation marks...
# cd ~/Downloads
# python ~/git/hu-libraries/netRoutines/uploadDir2Aws.py --localDir IO --bucketName markus1 --awsInitDir 2020
upload_dir(localDir=args.localDir, bucketName=args.bucketName,
awsInitDir=args.awsInitDir, tag=args.tag)
我必须自己解决这个问题,所以我想在这里包含我的代码片段。
我还需要过滤特定文件类型,并仅上传目录内容(相对于目录本身)。
import logging
import boto3
from pathlib import Path
log = logging.getLogger(__name__)
def upload_dir(
self,
local_dir: Union[str, Path],
s3_path: str = "/",
file_type: str = "",
contents_only: bool = False,
) -> bool:
"""
Upload the content of a local directory to a bucket path.
Args:
local_dir (Union[str, Path]): Directory to upload files from.
s3_path (str, optional): The path within the bucket to upload to.
If omitted, the bucket root is used.
file_type (str, optional): Upload files with extension only, e.g. txt.
contents_only (bool): Used to copy only the directory contents to the
specified path, not the directory itself.
Returns:
dict: key:value pair of file_name:upload_status.
upload_status True if uploaded, False if failed.
"""
resource = boto3.resource(
"s3",
aws_access_key_id="xxx",
aws_secret_access_key="xxx",
endpoint_url="xxx",
region_name=Bucket"xxx",
)
status_dict = {}
local_dir_path = Path(local_dir).resolve()
log.debug(f"Directory to upload: {local_dir_path}")
all_subdirs = local_dir_path.glob("**")
for dir_path in all_subdirs:
log.debug(f"Searching for files in directory: {dir_path}")
file_names = dir_path.glob(f"*{('.' + file_type) if file_type else ''}")
# Only return valid files
file_names = [f for f in file_names if f.is_file()]
log.debug(f"Files found: {list(file_names)}")
for _, file_name in enumerate(file_names):
s3_key = str(Path(s3_path) / file_name.relative_to(
local_dir_path if contents_only else local_dir_path.parent
))
log.debug(f"S3 key to upload: {s3_key}")
status_dict[str(file_name)] = self.upload_file(s3_key, file_name)
return status_dict
嗯,我喜欢递归代码,这是一个简单的例子:
import os
import boto3
from botocore.exceptions import ClientError
from genericpath import isfile
def upload_file(file_name, object_name=None):
if object_name is None:
object_name = os.path.basename(file_name)
s3_client = boto3.client('s3')
try:
s3_client.upload_file(file_name, BUCKET_NAME, object_name)
except ClientError as e:
print(e)
return False
return True
def upload_dir_recursive(localDir, awsInitDir, space=""):
print(space+"Processing dir: "+localDir)
for file in os.listdir(localDir):
file_path = localDir+file
if file != "logs":
if isfile(file_path):
upload_file(file_path, awsInitDir+file)
else:
upload_dir_recursive(file_path+'/', awsInitDir+file+'/', space+" ")
print(space+"... Done")
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--localDir", help="which dir to upload to aws")
parser.add_argument("--awsInitDir", help="to which 'directory' in aws")
args = parser.parse_args()
upload_dir_recursive(localDir=args.localDir, awsInitDir=args.awsInitDir)
请注意,upload_file 直接取自 boto3 文档。 与其他答案相比,此解决方案将遍历所有子目录,无论如何嵌套。我确实过滤了特定的文件类型,但这并不难。