Source code for exporters.readers.hubstorage_reader

import six
from exporters.readers.base_reader import BaseReader
from exporters.records.base_record import BaseRecord
from exporters.utils import str_list


[docs]class HubstorageReader(BaseReader): """ This reader retrieves items from Scrapinghub Hubstorage collections. - batch_size (int) Number of items to be returned in each batch - apikey (str) API key with access to the project where the items are being generated. - project_id (int or str) Id of the project. - collection_name (str) Name of the collection of items. - count (int) Number of records to read from collection. - prefixes (list) Only include records with given key prefixes. - exclude_prefixes (list) Exclude records with given key prefixes. - secondary_collections (list) A list of secondary collections to merge from. - startts (int or str) Either milliseconds since epoch, or date string. - endts (int or str) Either milliseconds since epoch, or date string. """ # List of options to set up the reader supported_options = { 'batch_size': {'type': six.integer_types, 'default': 10000}, 'apikey': {'type': six.string_types, 'env_fallback': 'EXPORTERS_HS_APIKEY'}, 'project_id': {'type': six.integer_types + six.string_types}, 'collection_name': {'type': six.string_types}, 'count': {'type': six.integer_types, 'default': 0}, 'prefixes': {'type': str_list, 'default': []}, 'exclude_prefixes': {'type': str_list, 'default': []}, 'secondary_collections': {'type': str_list, 'default': []}, 'startts': {'type': six.integer_types + six.string_types, 'default': None}, 'endts': {'type': six.integer_types + six.string_types, 'default': None}, } def __init__(self, *args, **kwargs): super(HubstorageReader, self).__init__(*args, **kwargs) self.batch_size = self.read_option('batch_size') self.collection_scanner = self._create_collection_scanner() self.logger.info( 'HubstorageReader has been initiated. ' 'Project id: {}. Collection name: {}'.format( self.read_option('project_id'), self.read_option('collection_name')) ) self.last_position = {} def _create_collection_scanner(self): from collection_scanner import CollectionScanner return CollectionScanner(self.read_option('apikey'), str(self.read_option('project_id')), self.read_option('collection_name'), batchsize=self.batch_size, startafter=self.last_position.get('last_key', None), count=self.read_option('count'), prefix=self.read_option('prefixes'), exclude_prefixes=self.read_option('exclude_prefixes'), secondary_collections=self.read_option('secondary_collections'), startts=self.read_option('startts'), endts=self.read_option('endts'), meta=['_key'])
[docs] def get_next_batch(self): """ This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable "finished" to True. """ if self.collection_scanner.is_enabled: batch = self.collection_scanner.get_new_batch() for item in batch: base_item = BaseRecord(item) self.increase_read() self.last_position['last_key'] = item['_key'] yield base_item self.logger.debug('Done reading batch') else: self.logger.debug('No more batches') self.finished = True
[docs] def set_last_position(self, last_position): """ Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support """ if last_position: if isinstance(last_position, six.string_types): last_key = last_position else: last_key = last_position.get('last_key', '') self.last_position = dict(last_key=last_key) self.collection_scanner.set_startafter(last_key) else: self.last_position = {}