Mongodb pipeline for scrapy
from twisted.internet.threads import deferToThread
#from scrapy.utils.serialize import ScrapyJSONEncoder
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError
from scrapy.exceptions import DropItem
class MongoPipeline(object):
"""Pushes scraped data in to mongodb"""
def __init__(self, host, port):
client = MongoClient()
db = client.demo # mongodb database
self.demo_items = db.items # mongodb database collection
# Assuming title field is unique to drop duplicates
self.demo_items.ensure_index([('title', 1)], unique=True)
@classmethod
def from_settings(cls, settings):
host = settings.get('MONGO_DB_HOST', '127.0.0.1')
port = settings.get('MONGO_DB_PORT', 27017)
return cls(host, port)
def process_item(self, item, spider):
"""Run the save the save item in a thread and return the result as a Deferred."""
return deferToThread(self._process_item, item)
def _process_item(self, item):
"""Save item to mongodb"""
data = {'title': item.get('title', ''),
'link': item.get('link', ''),
'content': item.get('content', '')
}
try:
if self.demo_items.insert(data):
return item
return None
except DuplicateKeyError:
raise DropItem("Item already exist %s" % item)