如何将Scrapy爬取的数据以csv或json格式上传到Amazon S3?

问题描述 投票:0回答:4

将 Scrapy 爬取的数据作为 csv/jsonl/json 文件上传到 Amazon s3 的步骤是什么?我从互联网上能找到的就是将抓取的图像上传到 s3 存储桶。

我目前使用的是Ubuntu 16.04, 我已经通过命令安装了 boto,

pip install boto

我已将以下几行添加到 settings.py 中。谁能解释一下我必须做出的其他改变。

AWS_ACCESS_KEY_ID = 'access key id'
AWS_SECRET_ACCESS_KEY= 'access key'


FEED_URI = 'bucket path'
FEED_FORMAT = 'jsonlines'
FEED_EXPORT_FIELDS = None
FEED_STORE_EMPTY = False
FEED_STORAGES = {}
FEED_STORAGES_BASE = { 
'': None,
'file': None,
'stdout': None,
's3': 'scrapy.extensions.feedexport.S3FeedStorage',
'ftp': None,
}
FEED_EXPORTERS = {}
FEED_EXPORTERS_BASE = {
    'json': None,
    'jsonlines': None,
    'jl': None,
    'csv': None,
    'xml': None,
    'marshal': None,
    'pickle': None,
}

编辑1:当我配置以上所有内容并运行

scrapy crawl spider
时, 爬取结果后出现以下错误。

2016-08-08 10:57:03 [scrapy] ERROR: Error storing csv feed (200 items) in: s3: myBucket/crawl.csv
Traceback (most recent call last):
File "/usr/lib/python2.7/dist-packages/twisted/python/threadpool.py", line 246, in inContext
result = inContext.theWork()
File "/usr/lib/python2.7/dist-packages/twisted/python/threadpool.py", line 262, in <lambda>
inContext.theWork = lambda: context.call(ctx, func, *args, **kw)
File "/usr/lib/python2.7/dist-packages/twisted/python/context.py", line 118, in callWithContext
return self.currentContext().callWithContext(ctx, func, *args, **kw)
File "/usr/lib/python2.7/dist-packages/twisted/python/context.py", line 81, in callWithContext
return func(*args,**kw)
File "/usr/local/lib/python2.7/dist-packages/scrapy/extensions/feedexport.py", line 123, in _store_in_thread
key.set_contents_from_file(file)
File "/usr/local/lib/python2.7/dist-packages/boto/s3/key.py", line 1293, in set_contents_from_file
chunked_transfer=chunked_transfer, size=size)
File "/usr/local/lib/python2.7/dist-packages/boto/s3/key.py", line 750, in send_file
chunked_transfer=chunked_transfer, size=size)
File "/usr/local/lib/python2.7/dist-packages/boto/s3/key.py", line 951, in _send_file_internal
query_args=query_args
File "/usr/local/lib/python2.7/dist-packages/boto/s3/connection.py", line 656, in make_request
auth_path = self.calling_format.build_auth_path(bucket, key)
File "/usr/local/lib/python2.7/dist-packages/boto/s3/connection.py", line 94, in build_auth_path
path = '/' + bucket
TypeError: cannot concatenate 'str' and 'NoneType' objects
python json amazon-s3 web-scraping scrapy
4个回答
10
投票

截至 2021 年,这项任务变得更加容易。

  • FEED_URI
    FEED_FORMAT
    已弃用并已移动 在一个名为
    FEEDS
    的新设置中。
  • 无需在内部定义自定义项目管道
    settings.py
  • 您必须安装
    botocore
    才能正常工作。

这是你必须添加的内容

settings.py

AWS_ACCESS_KEY_ID = 'your_access_key_id'
AWS_SECRET_ACCESS_KEY = 'your_secret_access_key'

FEEDS = {
    's3://your-bucket/path-to-data/%(name)s/data.json': {
        'format': 'json',
        'encoding': 'utf8',
        'store_empty': False,
        'indent': 4,
    }
}

可以在docs中查看所有可用选项的列表。

如果你还想在s3中存储文件或图像,那么你需要在

settings.py
中指定一个item pipeline和一个存储变量:

ITEM_PIPELINES = {
    'scrapy.pipelines.images.ImagesPipeline': 1  # For images, Pillow must be installed
    'scrapy.pipelines.files.FilesPipeline': 2  # For files
}

IMAGES_STORE = 's3://your-bucket/path_to_images_dir/'
FILES_STORE = 's3://your-bucket/path_to_files_dir/'

9
投票

通过将以下行添加到

settings.py
文件中解决了问题:

ITEM_PIPELINE = {
'scrapy.pipelines.files.S3FilesStore': 1
}

以及前面提到的 S3 凭证。

AWS_ACCESS_KEY_ID = 'access key id'
AWS_SECRET_ACCESS_KEY= 'access key'

FEED_URI='s3://bucket/folder/filename.json'

谢谢各位的指导。


6
投票

我决定用对我有用的代码片段来回答 Mil0R3 对 Abhishek K 答案的评论。

在settings.py中需要添加以下代码:

AWS_ACCESS_KEY_ID = ''
AWS_SECRET_ACCESS_KEY = ''

# You need to have both variables FEED_URI and S3PIPELINE_URL set to the same
# file or this code will not work.
FEED_URI = 's3://{bucket}/{file_name}.jsonl'
S3PIPELINE_URL = FEED_URI
FEED_FORMAT = 'jsonlines'

# project_folder refers to the folder that both pipelines.py and settings.py are in
ITEM_PIPELINES = {
    '{project_folder}.pipelines.S3Pipeline': 1,
}

在 pipelines.py 中,您需要添加以下对象。可以在此处找到复制和粘贴的 github 项目:https://github.com/orangain/scrapy-s3pipeline

# -*- coding: utf-8 -*-

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html


from io import BytesIO
from urllib.parse import urlparse
from datetime import datetime
import gzip

import boto3
from botocore.exceptions import ClientError

from scrapy.exporters import JsonLinesItemExporter

class S3Pipeline:
    """
    Scrapy pipeline to store items into S3 bucket with JSONLines format.
    Unlike FeedExporter, the pipeline has the following features:
    * The pipeline stores items by chunk.
    * Support GZip compression.
    """

    def __init__(self, settings, stats):
        self.stats = stats

        url = settings['S3PIPELINE_URL']
        o = urlparse(url)
        self.bucket_name = o.hostname
        self.object_key_template = o.path[1:]  # Remove the first '/'

        self.max_chunk_size = settings.getint('S3PIPELINE_MAX_CHUNK_SIZE', 100)
        self.use_gzip = settings.getbool('S3PIPELINE_GZIP', url.endswith('.gz'))

        self.s3 = boto3.client(
            's3',
            region_name=settings['AWS_REGION_NAME'], use_ssl=settings['AWS_USE_SSL'],
            verify=settings['AWS_VERIFY'], endpoint_url=settings['AWS_ENDPOINT_URL'],
            aws_access_key_id=settings['AWS_ACCESS_KEY_ID'],
            aws_secret_access_key=settings['AWS_SECRET_ACCESS_KEY'])
        self.items = []
        self.chunk_number = 0

    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler.settings, crawler.stats)

    def process_item(self, item, spider):
        """
        Process single item. Add item to items and then upload to S3 if size of items
        >= max_chunk_size.
        """
        self.items.append(item)
        if len(self.items) >= self.max_chunk_size:
            self._upload_chunk(spider)

        return item

    def open_spider(self, spider):
        """
        Callback function when spider is open.
        """
        # Store timestamp to replace {time} in S3PIPELINE_URL
        self.ts = datetime.utcnow().replace(microsecond=0).isoformat().replace(':', '-')

    def close_spider(self, spider):
        """
        Callback function when spider is closed.
        """
        # Upload remained items to S3.
        self._upload_chunk(spider)

    def _upload_chunk(self, spider):
        """
        Do upload items to S3.
        """

        if not self.items:
            return  # Do nothing when items is empty.

        f = self._make_fileobj()

        # Build object key by replacing variables in object key template.
        object_key = self.object_key_template.format(**self._get_uri_params(spider))

        try:
            self.s3.upload_fileobj(f, self.bucket_name, object_key)
        except ClientError:
            self.stats.inc_value('pipeline/s3/fail')
            raise
        else:
            self.stats.inc_value('pipeline/s3/success')
        finally:
            # Prepare for the next chunk
            self.chunk_number += len(self.items)
            self.items = []

    def _get_uri_params(self, spider):
        params = {}
        for key in dir(spider):
            params[key] = getattr(spider, key)

        params['chunk'] = self.chunk_number
        params['time'] = self.ts
        return params

    def _make_fileobj(self):
        """
        Build file object from items.
        """

        bio = BytesIO()
        f = gzip.GzipFile(mode='wb', fileobj=bio) if self.use_gzip else bio

        # Build file object using ItemExporter
        exporter = JsonLinesItemExporter(f)
        exporter.start_exporting()
        for item in self.items:
            exporter.export_item(item)
        exporter.finish_exporting()

        if f is not bio:
            f.close()  # Close the file if GzipFile

        # Seek to the top of file to be read later
        bio.seek(0)

        return bio

特别提示:

我需要删除 OP 的 settings.py 文件中的一些数据才能使该管道正常工作。所有这些都需要删除

FEED_EXPORT_FIELDS = None
FEED_STORE_EMPTY = False
FEED_STORAGES = {}
FEED_STORAGES_BASE = { 
'': None,
'file': None,
'stdout': None,
's3': 'scrapy.extensions.feedexport.S3FeedStorage',
'ftp': None,
}
FEED_EXPORTERS = {}
FEED_EXPORTERS_BASE = {
    'json': None,
    'jsonlines': None,
    'jl': None,
    'csv': None,
    'xml': None,
    'marshal': None,
    'pickle': None,
}

另外,请确保 S3PIPELINE_URL 变量等于 FEED_URI

如果不从 settings.py 中删除上述信息,或者不将上述两个变量相互设置,将导致 jsonl 文件显示在您的 S3 存储桶中,但仅添加一个项目的多个副本。但我不知道为什么会发生这种情况......

这花了我几个小时才弄清楚,所以我希望它可以节省一些时间。


0
投票

以下步骤将允许您轻松地将 csv 或 json 数据保存在本地并直接保存到 aws s3 存储桶 步骤1: 安装 python-dotenv 运行 pip install python-dotenv

第2步: 在 scrapy 项目中创建 .env 文件以安全地保存您的 aws 凭证 .env file to store aws credentials

第3步: 使用 pip 安装 boto3 和 s3fs

第4步: 在你的蜘蛛文件中,直接在蜘蛛名称下,复制并粘贴下面的代码片段,然后进行相应的编辑,不要忘记导入 os 模块

custom_settings = {

'AWS_ACCESS_KEY_ID': os.environ.get('AWS_ACCESS_KEY_ID'),
'AWS_SECRET_ACCESS_KEY' : os.environ.get('AWS_SECRET_ACCESS_KEY'),

'FEEDS' : {
    's3://your-buket-name/%(name)s/%(name)s_%(time)s.json': {
        'format': 'json',
        'encoding': 'utf8',
        'store_empty': False,
        'indent': 4
    },
    's3://your-bucket-name/%(name)s/%(name)s_%(time)s.csv':{
        'format':'csv',
        'encoding':'utf-8',
        'store_empty':False,
        'indent':4
    },
    'local_file.json':{'format':'json','overwrite':'True'},
    'local_file.csv':{'format':'csv','overwrite':'True'}

} }

最后,确保您创建的 iam 用户具有至少写入 s3 存储桶的权限,您可以将 s3fullaccess 附加到该用户

快乐编码

© www.soinside.com 2019 - 2024. All rights reserved.