Source code for exporters.bypasses.stream_bypass

import logging
from collections import namedtuple
from contextlib import closing

from exporters.bypasses.base import BaseBypass
from exporters.module_loader import ModuleLoader
from exporters.iterio import cohere_stream

Stream = namedtuple('Stream', 'filename size meta')


[docs]class StreamBypassState(object): def __init__(self, config, metadata): module_loader = ModuleLoader() self.state = module_loader.load_persistence(config.persistence_options, metadata) self.state_position = self.state.get_last_position() if not self.state_position: self.done = [] self.skipped = [] self.stats = {'bytes_copied': 0} self.state.commit_position(self._get_state()) else: self.done = [] self.skipped = self.state_position['done'] self.stats = self.state_position.get('stats', {'bytes_copied': 0}) def _get_state(self): return dict(done=self.done, skipped=self.skipped, stats=self.stats)
[docs] def commit_copied(self, stream): self.increment_bytes(stream.size) self.done.append(stream) self.state.commit_position(self._get_state())
[docs] def increment_bytes(self, cnt): self.stats['bytes_copied'] += cnt
[docs] def delete(self): self.state.delete()
[docs]class StreamBypass(BaseBypass): """ Generic Bypass that streams the contents of the files instead of running them through the export pipeline. It should be transparent to user. Conditions are: - Reader module supports get_read_streams - Writer module supports write_stream - No filter modules are set up. - No transform module is set up. - No grouper module is set up. - writer has no option items_limit set in configuration. - writer has default items_per_buffer_write and size_per_buffer_write per default. - writer has default write_buffer. """ def __init__(self, config, metadata): super(StreamBypass, self).__init__(config, metadata) self.bypass_state = None self.logger = logging.getLogger('bypass_logger') self.logger.setLevel(logging.INFO) @classmethod
[docs] def meets_conditions(cls, config): if not config.filter_before_options['name'].endswith('NoFilter'): cls._log_skip_reason('custom filter configured') return False if not config.filter_after_options['name'].endswith('NoFilter'): cls._log_skip_reason('custom filter configured') return False if not config.transform_options['name'].endswith('NoTransform'): cls._log_skip_reason('custom transform configured') return False if not config.grouper_options['name'].endswith('NoGrouper'): cls._log_skip_reason('custom grouper configured') return False if config.writer_options.get('options', {}).get('items_limit'): cls._log_skip_reason('items limit configuration (items_limit)') return False if config.writer_options.get('options', {}).get('items_per_buffer_write'): cls._log_skip_reason('buffer limit configuration (items_per_buffer_write)') return False if config.writer_options.get('options', {}).get('size_per_buffer_write'): cls._log_skip_reason('buffer limit configuration (size_per_buffer_write)') return False write_buffer = config.writer_options['options'].get('write_buffer') if write_buffer and not write_buffer.endswith('base.WriteBuffer'): cls._log_skip_reason('custom write buffer configuration') return False module_loader = ModuleLoader() try: with closing(module_loader.load_class(config.reader_options['name'])) as reader: pass with closing(module_loader.load_class(config.writer_options['name'])) as writer: pass except: cls._log_skip_reason("Can't load reader and/or writer") return False if not callable(getattr(reader, 'get_read_streams', None)) or\ not callable(getattr(reader, 'open_stream', None)): cls._log_skip_reason("Reader doesn't support get_read_streams()/open_stream()") return False if not hasattr(writer, 'write_stream'): cls._log_skip_reason("Writer doesn't support write_stream()") return False return True
[docs] def execute(self): # We can't count items on streamed bypasses self.valid_total_count = False self.bypass_state = StreamBypassState(self.config, self.metadata) module_loader = ModuleLoader() reader = module_loader.load_reader(self.config.reader_options, self.metadata) writer = module_loader.load_writer(self.config.writer_options, self.metadata) with closing(reader), closing(writer): for stream in reader.get_read_streams(): if stream not in self.bypass_state.skipped: file_obj = cohere_stream(reader.open_stream(stream)) logging.log(logging.INFO, 'Starting to copy file {}'.format(stream.filename)) try: writer.write_stream(stream, file_obj) finally: file_obj.close() logging.log(logging.INFO, 'Finished copying file {}'.format(stream.filename)) self.bypass_state.commit_copied(stream) else: logging.log(logging.INFO, 'Skip file {}'.format(stream.filename))
[docs] def close(self): if self.bypass_state: self.bypass_state.delete()