如何使用 boto 将文件从 Amazon S3 流式传输到 Rackspace Cloudfiles?

问题描述 投票:0回答:6

我正在将文件从 S3 复制到 Cloudfiles,我想避免将文件写入磁盘。 Python-Cloudfiles 库有一个 object.stream() 调用,看起来是我需要的,但我在 boto 中找不到等效的调用。我希望我能够做类似的事情:

shutil.copyfileobj(s3Object.stream(),rsObject.stream())

boto(或者我想是任何其他 s3 库)这可能吗?

python amazon-s3 boto rackspace cloudfiles
6个回答
87
投票

此线程中的其他答案与 boto 相关,但 S3.Object 在 boto3 中不再可迭代。因此,以下内容不起作用,它会产生

TypeError: 's3.Object' object is not iterable
错误消息:

s3 = boto3.session.Session(profile_name=my_profile).resource('s3')
s3_obj = s3.Object(bucket_name=my_bucket, key=my_key)

with io.FileIO('sample.txt', 'w') as file:
    for i in s3_obj:
        file.write(i)

在 boto3 中,对象的内容在

S3.Object.get()['Body']
处可用,这是自版本 1.9.68 以来的可迭代对象,但以前不是。因此,以下内容适用于最新版本的 boto3,但不适用于早期版本:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    for i in body:
        file.write(i)

因此,较旧的 boto3 版本的替代方法是使用 read 方法,但这会在内存中加载整个 S3 对象,这在处理大文件时并不总是可行的:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    for i in body.read():
        file.write(i)

但是

read
方法允许传入
amt
参数,指定我们要从底层流中读取的字节数。这个方法可以重复调用直到整个流被读取:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    while file.write(body.read(amt=512)):
        pass

深入研究

botocore.response.StreamingBody
代码,发现底层流也是可用的,所以我们可以迭代如下:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    for b in body._raw_stream:
        file.write(b)

虽然谷歌搜索我也看到了一些可以使用的链接,但我没有尝试过:


23
投票

boto 中的 Key 对象,代表 S3 中的对象,可以像迭代器一样使用,所以你应该能够做这样的事情:

>>> import boto
>>> c = boto.connect_s3()
>>> bucket = c.lookup('garnaat_pub')
>>> key = bucket.lookup('Scan1.jpg')
>>> for bytes in key:
...   write bytes to output stream

或者,就您的示例而言,您可以这样做:

>>> shutil.copyfileobj(key, rsObject.stream())

21
投票

我想至少有一些看到这个问题的人会像我一样,并且会想要一种方法来逐行(或逗号接逗号,或任何其他分隔符)从 boto 流式传输文件。这是一个简单的方法:

def getS3ResultsAsIterator(self, aws_access_info, key, prefix):        
    s3_conn = S3Connection(**aws_access)
    bucket_obj = s3_conn.get_bucket(key)
    # go through the list of files in the key
    for f in bucket_obj.list(prefix=prefix):
        unfinished_line = ''
        for byte in f:
            byte = unfinished_line + byte
            #split on whatever, or use a regex with re.split()
            lines = byte.split('\n')
            unfinished_line = lines.pop()
            for line in lines:
                yield line

@garnaat 上面的回答仍然很好,而且 100% 正确。希望我的仍然能帮助别人。


13
投票

Botocore的

StreamingBody
有一个
iter_lines()
方法:

https://botocore.amazonaws.com/v1/documentation/api/latest/reference/response.html#botocore.response.StreamingBody.iter_lines

所以:

import boto3
s3r = boto3.resource('s3')
iterator = s3r.Object(bucket, key).get()['Body'].iter_lines()

for line in iterator:
    print(line)

7
投票

这是我的包裹流体的方案:

import io
class S3ObjectInterator(io.RawIOBase):
    def __init__(self, bucket, key):
        """Initialize with S3 bucket and key names"""
        self.s3c = boto3.client('s3')
        self.obj_stream = self.s3c.get_object(Bucket=bucket, Key=key)['Body']

    def read(self, n=-1):
        """Read from the stream"""
        return self.obj_stream.read() if n == -1 else self.obj_stream.read(n)

用法示例:

obj_stream = S3ObjectInterator(bucket, key)
for line in obj_stream:
    print line

0
投票

如果您对其他选项持开放态度,smart_open 是一个用于在 Python 中流式传输大文件的实用程序,它使工作变得非常容易。

这里有两个例子:

import boto3
from smart_open import open

session = boto3.Session(
    aws_access_key_id="xxx",
    aws_secret_access_key="xxx",
)
client = session.client('s3')

for line in open(
    "s3://my-bucket/my-file.txt",
    transport_params=dict(client=client),
):
    print(line)

对于压缩文件:

import boto3
from smart_open import open

session = boto3.Session(
    aws_access_key_id="xxx",
    aws_secret_access_key="xxx",
)
client = session.client('s3')

for line in open(
    "s3://my-bucket/my-file.txt.gz",
    encoding="utf-8",
    transport_params=dict(client=client),
):
    print(line)
© www.soinside.com 2019 - 2024. All rights reserved.