Source code for exporters.contrib.writers.odo_writer

import six
import json
import gzip
from exporters.default_retries import retry_long

from exporters.writers.base_writer import BaseWriter


[docs]class ODOWriter(BaseWriter): """ Writes items to a odo destination. https://odo.readthedocs.org/en/latest/ Needed parameters: - schema (object) schema object. - odo_uri (str) ODO valid destination uri. """ requirements = { 'schema': {'type': object, 'required': True}, 'odo_uri': {'type': six.string_types, 'required': True} } def __init__(self, options): super(ODOWriter, self).__init__(options) from flatson import Flatson schema = self.read_option('schema', None) self.odo_uri = self.read_option('odo_uri', None) self.flatson = Flatson(schema) self.logger.info('ODOWriter has been initiated. Writing to: {}'.format(self.odo_uri)) @retry_long
[docs] def write(self, dump_path, group_key=''): from odo import odo, resource, discover import pandas as pd with gzip.open(dump_path) as f: lines = [json.loads(line.replace('\n', '')) for line in f.readlines()] flattened_lines = (self.flatson.flatten(line) for line in lines) pf = pd.DataFrame(flattened_lines, columns=self.flatson.fieldnames) dshape = discover(pf) odo(pf, resource(self.odo_uri), dshape=dshape)