import six
from exporters.default_retries import retry_generator
from exporters.readers.base_reader import BaseReader
from exporters.iterio import cohere_stream
from exporters.decompressors import ZLibDecompressor
from exporters.deserializers import JsonLinesDeserializer
[docs]class StreamBasedReader(BaseReader):
"""
Abstract readers for storage backends that operate in bytes
instead of json objects.
The bytes are first decompressed using a decompressor and then
deserialized using a deserializer.
Avaliable Options:
- batch_size (int)
Number of items to be returned in each batch
"""
# List of options to set up the reader
supported_options = {
'batch_size': {'type': six.integer_types, 'default': 10000},
}
def __init__(self, *args, **kwargs):
super(StreamBasedReader, self).__init__(*args, **kwargs)
self.iterator = None
self.batch_size = self.read_option('batch_size')
decompressor = ZLibDecompressor({}, None)
deserializer = JsonLinesDeserializer({}, None)
@retry_generator
[docs] def iteritems_retrying(self, stream_data):
if stream_data.filename in self.last_position['readed_streams']:
return
stream = cohere_stream(self.open_stream(stream_data))
try:
stream = self.decompressor.decompress(stream)
stream = cohere_stream(stream)
items_readed = 0
stream_offset = self.last_position['stream_offset']
items_offset = stream_offset.get(stream_data.filename, 0)
for item in self.deserializer.deserialize(stream):
items_readed += 1
if items_readed > items_offset:
stream_offset[stream_data.filename] = items_readed
yield item
finally:
stream.close()
self.last_position['readed_streams'].append(stream_data.filename)
del stream_offset[stream_data.filename]
[docs] def iteritems(self):
for stream in self.get_read_streams():
for record in self.iteritems_retrying(stream):
yield record
self.finished = True
[docs] def get_next_batch(self):
"""
This method is called from the manager. It must return a list or a generator
of BaseRecord objects.
When it has nothing else to read, it must set class variable "finished" to True.
"""
if self.iterator is None:
self.iterator = self.iteritems()
count = 0
while count < self.batch_size:
count += 1
yield next(self.iterator)
self.logger.debug('Done reading batch')
[docs] def get_read_streams(self):
"""
To be subclassed
"""
raise NotImplementedError()
[docs] def set_last_position(self, last_position):
"""
Called from the manager, it is in charge of updating the last position of data commited
by the writer, in order to have resume support
"""
last_position = last_position or {}
last_position.setdefault('readed_streams', [])
last_position.setdefault('stream_offset', {})
self.last_position = last_position
[docs]def is_stream_reader(reader):
return isinstance(reader, StreamBasedReader)