How to upload crawled data from Scrapy to Amazon S3 as csv or json? How to upload crawled data from Scrapy to Amazon S3 as csv or json? json json

How to upload crawled data from Scrapy to Amazon S3 as csv or json?


The problem was solved by adding the following line into settings.py file:

ITEM_PIPELINE = {'scrapy.pipelines.files.S3FilesStore': 1}

along with the S3 credentials mentioned earlier.

AWS_ACCESS_KEY_ID = 'access key id'AWS_SECRET_ACCESS_KEY= 'access key'FEED_URI='s3://bucket/folder/filename.json'

Thank you guys for your guidance.


I've decided to answers Mil0R3 comment on Abhishek K answer with the code snippet that worked for me.

in settings.py you need to add the following code:

AWS_ACCESS_KEY_ID = ''AWS_SECRET_ACCESS_KEY = ''# You need to have both variables FEED_URI and S3PIPELINE_URL set to the same# file or this code will not work.FEED_URI = 's3://{bucket}/{file_name}.jsonl'S3PIPELINE_URL = FEED_URIFEED_FORMAT = 'jsonlines'# project_folder refers to the folder that both pipelines.py and settings.py are inITEM_PIPELINES = {    '{project_folder}.pipelines.S3Pipeline': 1,}

Inside the pipelines.py you need to add the following object. The github project this is copied and pasted from can be found here: https://github.com/orangain/scrapy-s3pipeline

# -*- coding: utf-8 -*-# Define your item pipelines here## Don't forget to add your pipeline to the ITEM_PIPELINES setting# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.htmlfrom io import BytesIOfrom urllib.parse import urlparsefrom datetime import datetimeimport gzipimport boto3from botocore.exceptions import ClientErrorfrom scrapy.exporters import JsonLinesItemExporterclass S3Pipeline:    """    Scrapy pipeline to store items into S3 bucket with JSONLines format.    Unlike FeedExporter, the pipeline has the following features:    * The pipeline stores items by chunk.    * Support GZip compression.    """    def __init__(self, settings, stats):        self.stats = stats        url = settings['S3PIPELINE_URL']        o = urlparse(url)        self.bucket_name = o.hostname        self.object_key_template = o.path[1:]  # Remove the first '/'        self.max_chunk_size = settings.getint('S3PIPELINE_MAX_CHUNK_SIZE', 100)        self.use_gzip = settings.getbool('S3PIPELINE_GZIP', url.endswith('.gz'))        self.s3 = boto3.client(            's3',            region_name=settings['AWS_REGION_NAME'], use_ssl=settings['AWS_USE_SSL'],            verify=settings['AWS_VERIFY'], endpoint_url=settings['AWS_ENDPOINT_URL'],            aws_access_key_id=settings['AWS_ACCESS_KEY_ID'],            aws_secret_access_key=settings['AWS_SECRET_ACCESS_KEY'])        self.items = []        self.chunk_number = 0    @classmethod    def from_crawler(cls, crawler):        return cls(crawler.settings, crawler.stats)    def process_item(self, item, spider):        """        Process single item. Add item to items and then upload to S3 if size of items        >= max_chunk_size.        """        self.items.append(item)        if len(self.items) >= self.max_chunk_size:            self._upload_chunk(spider)        return item    def open_spider(self, spider):        """        Callback function when spider is open.        """        # Store timestamp to replace {time} in S3PIPELINE_URL        self.ts = datetime.utcnow().replace(microsecond=0).isoformat().replace(':', '-')    def close_spider(self, spider):        """        Callback function when spider is closed.        """        # Upload remained items to S3.        self._upload_chunk(spider)    def _upload_chunk(self, spider):        """        Do upload items to S3.        """        if not self.items:            return  # Do nothing when items is empty.        f = self._make_fileobj()        # Build object key by replacing variables in object key template.        object_key = self.object_key_template.format(**self._get_uri_params(spider))        try:            self.s3.upload_fileobj(f, self.bucket_name, object_key)        except ClientError:            self.stats.inc_value('pipeline/s3/fail')            raise        else:            self.stats.inc_value('pipeline/s3/success')        finally:            # Prepare for the next chunk            self.chunk_number += len(self.items)            self.items = []    def _get_uri_params(self, spider):        params = {}        for key in dir(spider):            params[key] = getattr(spider, key)        params['chunk'] = self.chunk_number        params['time'] = self.ts        return params    def _make_fileobj(self):        """        Build file object from items.        """        bio = BytesIO()        f = gzip.GzipFile(mode='wb', fileobj=bio) if self.use_gzip else bio        # Build file object using ItemExporter        exporter = JsonLinesItemExporter(f)        exporter.start_exporting()        for item in self.items:            exporter.export_item(item)        exporter.finish_exporting()        if f is not bio:            f.close()  # Close the file if GzipFile        # Seek to the top of file to be read later        bio.seek(0)        return bio

Special Notes:

I needed to remove some data inside the OP's settings.py file for this Pipeline to work correctly. All of this will need to be removed

FEED_EXPORT_FIELDS = NoneFEED_STORE_EMPTY = FalseFEED_STORAGES = {}FEED_STORAGES_BASE = { '': None,'file': None,'stdout': None,'s3': 'scrapy.extensions.feedexport.S3FeedStorage','ftp': None,}FEED_EXPORTERS = {}FEED_EXPORTERS_BASE = {    'json': None,    'jsonlines': None,    'jl': None,    'csv': None,    'xml': None,    'marshal': None,    'pickle': None,}

Also, be sure to have S3PIPELINE_URL variable equal to FEED_URI

Either, not removing the above info from settings.py or not having the two above variables set to each other will result in a jsonl file showing up inside your S3 bucket, but with multiple copies of only a single item added. I have no idea why that happens, though...

This took me a few hours to figure out, so I hope it saves someone some time.