Source code for exporters.writers.hs_reduce_writer

import re
from .reduce_writer import ReduceWriter
from exporters.exceptions import ConfigurationError
from collections import MutableMapping
import six


COLLECTION_REGEX = '.*[.]scrapinghub[.]com/p/(\d+)/collections/s/([^/]+)/?$'


[docs]class HubstorageReduceWriter(ReduceWriter): """ This writer allow exporters to make aggregation of items data and push results into Scrapinghub Hubstorage collections - code (str) Python code defining a reduce_function(item, accumulator=None) - collection_url (str) Hubstorage Collection URL - key (str) Element key where to push the accumulated result - apikey (dict) Hubstorage API key """ supported_options = { "collection_url": { 'type': six.string_types, 'help': 'Hubstorage Collection URL' }, 'key': { 'type': six.string_types, 'help': 'Element key where to push the accumulated result' }, 'apikey': {'type': six.string_types, 'help': 'Hubstorage API key'}, } supported_options.update(ReduceWriter.supported_options) def __init__(self, *args, **kwargs): super(HubstorageReduceWriter, self).__init__(*args, **kwargs) self.collection = self._get_collection() self.element_key = self.read_option('key') collection_url = self.read_option('collection_url') self.logger.info('Will write accumulator to: {}#/details/{}' .format(collection_url, self.element_key))
[docs] def get_result(self, **extra): result = dict(self.reduced_result if isinstance(self.reduced_result, MutableMapping) else dict(value=self.reduced_result)) result['_key'] = self.element_key result.update(extra) return result
[docs] def write_batch(self, batch): super(HubstorageReduceWriter, self).write_batch(batch) self.collection.set(self.get_result(_finished=False))
[docs] def finish_writing(self): self.collection.set(self.get_result(_finished=True))
def _get_collection(self): collection_url = self.read_option('collection_url') match = re.match(COLLECTION_REGEX, collection_url) if not match: raise ConfigurationError("Invalid collection_url: %s" % collection_url) project, collection_name = match.groups() import hubstorage client = hubstorage.HubstorageClient(self.read_option('apikey')) return client.get_project(project).collections.new_store(collection_name)