Source code for exporters.readers.fs_reader

import os
import re

from exporters.readers.base_stream_reader import StreamBasedReader
from exporters.exceptions import ConfigurationError
from exporters.bypasses.stream_bypass import Stream


[docs]class FSReader(StreamBasedReader): """ Reads items from files located in filesystem and compressed with gzip with a common path. - input (str/dict or a list of str/dict) Specification of files to be read. Accepts either one "input_unit" or many of them in a list. "input_unit" is defined as follows: If a string, it indicates a filename, e.g. "/path/to/filename". If a dictionary, it indicates a directory to be read with the following elements: - "dir": path to directory, e.g. "/path/to/dir". - "dir_pointer": path to file containing path to directory, e.g. "/path/to/pointer/file" which contains "/path/to/dir". Cannot be used together with "dir". For example: We have a weekly export set with CRON. If we wanted to point to a new data path every week, we should keep updating the export configuration. With a pointer, we can set the reader to read from that file, which contains one line with a valid path to datasets, so only that pointer file should be updated. - "pattern": (optional) regular expression to filter filenames, e.g. "output.*\.jl\.gz$" """ # List of options to set up the reader supported_options = { 'input': {'type': (str, dict, list), 'default': {'dir': ''}} } def __init__(self, *args, **kwargs): super(FSReader, self).__init__(*args, **kwargs) self.input_specification = self.read_option('input') self.files = self._get_input_files(self.input_specification) self.read_files = [] self.current_file = None self.last_line = 0 self.logger.info('FSReader has been initiated') @classmethod def _get_input_files(cls, input_specification): """Get list of input files according to input definition. Input definition can be: - str: specifying a filename - list of str: specifying list a of filenames - dict with "dir" and optional "pattern" parameters: specifying the toplevel directory under which input files will be sought and an optional filepath pattern """ if isinstance(input_specification, (basestring, dict)): input_specification = [input_specification] elif not isinstance(input_specification, list): raise ConfigurationError("Input specification must be string, list or dict.") out = [] for input_unit in input_specification: if isinstance(input_unit, basestring): out.append(input_unit) elif isinstance(input_unit, dict): missing = object() directory = input_unit.get('dir', missing) dir_pointer = input_unit.get('dir_pointer', missing) if directory is missing and dir_pointer is missing: raise ConfigurationError( 'Input directory dict must contain' ' "dir" or "dir_pointer" element (but not both)') if directory is not missing and dir_pointer is not missing: raise ConfigurationError( 'Input directory dict must not contain' ' both "dir" and "dir_pointer" elements') if dir_pointer is not missing: directory = cls._get_pointer(dir_pointer) out.extend(cls._get_directory_files( directory=directory, pattern=input_unit.get('pattern'), include_dot_files=input_unit.get('include_dot_files', False))) else: raise ConfigurationError('Input must only contain strings or dicts') return out @classmethod def _get_pointer(cls, path_pointer): """ Given a pointer path extracts the path to read the datasets from """ with open(path_pointer) as f: return f.read().strip() @classmethod def _get_directory_files(cls, directory, pattern=None, include_dot_files=False): match_funcs = [] if pattern is not None: match_funcs.append(re.compile(pattern).search) if not include_dot_files: def is_non_dot_file(filepath): return not os.path.basename(filepath).startswith('.') match_funcs.append(is_non_dot_file) return [ filepath for dirpath, directories, filenames in os.walk(directory) for filepath in (os.path.join(dirpath, f) for f in filenames) if all(mf(filepath) for mf in match_funcs) ]
[docs] def open_stream(self, stream): return open(stream.filename, 'rb')
[docs] def get_read_streams(self): for fpath in sorted(self.files): size = os.path.getsize(fpath) yield Stream(fpath, size, None)