How to upload crawled data from Scrapy to Amazon S3 as csv or json?
The problem was solved by adding the following line into settings.py
file:
ITEM_PIPELINE = {'scrapy.pipelines.files.S3FilesStore': 1}
along with the S3 credentials mentioned earlier.
AWS_ACCESS_KEY_ID = 'access key id'AWS_SECRET_ACCESS_KEY= 'access key'FEED_URI='s3://bucket/folder/filename.json'
Thank you guys for your guidance.
I've decided to answers Mil0R3 comment on Abhishek K answer with the code snippet that worked for me.
in settings.py you need to add the following code:
AWS_ACCESS_KEY_ID = ''AWS_SECRET_ACCESS_KEY = ''# You need to have both variables FEED_URI and S3PIPELINE_URL set to the same# file or this code will not work.FEED_URI = 's3://{bucket}/{file_name}.jsonl'S3PIPELINE_URL = FEED_URIFEED_FORMAT = 'jsonlines'# project_folder refers to the folder that both pipelines.py and settings.py are inITEM_PIPELINES = { '{project_folder}.pipelines.S3Pipeline': 1,}
Inside the pipelines.py you need to add the following object. The github project this is copied and pasted from can be found here: https://github.com/orangain/scrapy-s3pipeline
# -*- coding: utf-8 -*-# Define your item pipelines here## Don't forget to add your pipeline to the ITEM_PIPELINES setting# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.htmlfrom io import BytesIOfrom urllib.parse import urlparsefrom datetime import datetimeimport gzipimport boto3from botocore.exceptions import ClientErrorfrom scrapy.exporters import JsonLinesItemExporterclass S3Pipeline: """ Scrapy pipeline to store items into S3 bucket with JSONLines format. Unlike FeedExporter, the pipeline has the following features: * The pipeline stores items by chunk. * Support GZip compression. """ def __init__(self, settings, stats): self.stats = stats url = settings['S3PIPELINE_URL'] o = urlparse(url) self.bucket_name = o.hostname self.object_key_template = o.path[1:] # Remove the first '/' self.max_chunk_size = settings.getint('S3PIPELINE_MAX_CHUNK_SIZE', 100) self.use_gzip = settings.getbool('S3PIPELINE_GZIP', url.endswith('.gz')) self.s3 = boto3.client( 's3', region_name=settings['AWS_REGION_NAME'], use_ssl=settings['AWS_USE_SSL'], verify=settings['AWS_VERIFY'], endpoint_url=settings['AWS_ENDPOINT_URL'], aws_access_key_id=settings['AWS_ACCESS_KEY_ID'], aws_secret_access_key=settings['AWS_SECRET_ACCESS_KEY']) self.items = [] self.chunk_number = 0 @classmethod def from_crawler(cls, crawler): return cls(crawler.settings, crawler.stats) def process_item(self, item, spider): """ Process single item. Add item to items and then upload to S3 if size of items >= max_chunk_size. """ self.items.append(item) if len(self.items) >= self.max_chunk_size: self._upload_chunk(spider) return item def open_spider(self, spider): """ Callback function when spider is open. """ # Store timestamp to replace {time} in S3PIPELINE_URL self.ts = datetime.utcnow().replace(microsecond=0).isoformat().replace(':', '-') def close_spider(self, spider): """ Callback function when spider is closed. """ # Upload remained items to S3. self._upload_chunk(spider) def _upload_chunk(self, spider): """ Do upload items to S3. """ if not self.items: return # Do nothing when items is empty. f = self._make_fileobj() # Build object key by replacing variables in object key template. object_key = self.object_key_template.format(**self._get_uri_params(spider)) try: self.s3.upload_fileobj(f, self.bucket_name, object_key) except ClientError: self.stats.inc_value('pipeline/s3/fail') raise else: self.stats.inc_value('pipeline/s3/success') finally: # Prepare for the next chunk self.chunk_number += len(self.items) self.items = [] def _get_uri_params(self, spider): params = {} for key in dir(spider): params[key] = getattr(spider, key) params['chunk'] = self.chunk_number params['time'] = self.ts return params def _make_fileobj(self): """ Build file object from items. """ bio = BytesIO() f = gzip.GzipFile(mode='wb', fileobj=bio) if self.use_gzip else bio # Build file object using ItemExporter exporter = JsonLinesItemExporter(f) exporter.start_exporting() for item in self.items: exporter.export_item(item) exporter.finish_exporting() if f is not bio: f.close() # Close the file if GzipFile # Seek to the top of file to be read later bio.seek(0) return bio
Special Notes:
I needed to remove some data inside the OP's settings.py file for this Pipeline to work correctly. All of this will need to be removed
FEED_EXPORT_FIELDS = NoneFEED_STORE_EMPTY = FalseFEED_STORAGES = {}FEED_STORAGES_BASE = { '': None,'file': None,'stdout': None,'s3': 'scrapy.extensions.feedexport.S3FeedStorage','ftp': None,}FEED_EXPORTERS = {}FEED_EXPORTERS_BASE = { 'json': None, 'jsonlines': None, 'jl': None, 'csv': None, 'xml': None, 'marshal': None, 'pickle': None,}
Also, be sure to have S3PIPELINE_URL variable equal to FEED_URI
Either, not removing the above info from settings.py or not having the two above variables set to each other will result in a jsonl file showing up inside your S3 bucket, but with multiple copies of only a single item added. I have no idea why that happens, though...
This took me a few hours to figure out, so I hope it saves someone some time.