import datetime
import traceback
from collections import OrderedDict
from contextlib import closing
from exporters.default_retries import disable_retries
from exporters.exporter_config import ExporterConfig
from exporters.logger.base_logger import ExportManagerLogger
from exporters.meta import ExportMeta
from exporters.module_loader import ModuleLoader
from exporters.notifications.notifiers_list import NotifiersList
from exporters.notifications.receiver_groups import CLIENTS, TEAM
from exporters.writers.base_writer import ItemsLimitReached
from exporters.readers.base_stream_reader import is_stream_reader
from six.moves.queue import Queue
from threading import Thread
import time
[docs]class BaseExporter(object):
def __init__(self, configuration):
self.config = ExporterConfig(configuration)
self.threaded = self.config.exporter_options.get('threaded', False)
self.queue_size = self.config.exporter_options.get('thread_queue_size', 100)
self.logger = ExportManagerLogger(self.config.log_options)
self.module_loader = ModuleLoader()
metadata = ExportMeta(configuration)
self.metadata = metadata
self.reader = self.module_loader.load_reader(
self.config.reader_options, metadata)
if is_stream_reader(self.reader):
deserializer = self.module_loader.load_deserializer(
self.config.deserializer_options, metadata)
decompressor = self.module_loader.load_decompressor(
self.config.decompressor_options, metadata)
self.reader.deserializer = deserializer
self.reader.decompressor = decompressor
self.filter_before = self.module_loader.load_filter(
self.config.filter_before_options, metadata)
self.filter_after = self.module_loader.load_filter(
self.config.filter_after_options, metadata)
self.transform = self.module_loader.load_transform(
self.config.transform_options, metadata)
self.export_formatter = self.module_loader.load_formatter(
self.config.formatter_options, metadata)
self.writer = self.module_loader.load_writer(
self.config.writer_options, metadata, export_formatter=self.export_formatter)
self.persistence = self.module_loader.load_persistence(
self.config.persistence_options, metadata)
self.grouper = self.module_loader.load_grouper(
self.config.grouper_options, metadata)
self.notifiers = NotifiersList(self.config.notifiers, metadata)
if self.config.disable_retries:
disable_retries()
self.logger.debug('{} has been initiated'.format(self.__class__.__name__))
self.stats_manager = self.module_loader.load_stats_manager(
self.config.stats_options, metadata)
self.bypass_cases = []
def _run_pipeline_iteration(self):
times = OrderedDict([('started', datetime.datetime.now())])
self.logger.debug('Getting new batch')
if self.config.exporter_options.get('forced_reads'):
next_batch = list(self.reader.get_next_batch())
else:
next_batch = self.reader.get_next_batch()
times.update(read=datetime.datetime.now())
next_batch = self.filter_before.filter_batch(next_batch)
times.update(filtered=datetime.datetime.now())
next_batch = self.transform.transform_batch(next_batch)
times.update(transformed=datetime.datetime.now())
next_batch = self.filter_after.filter_batch(next_batch)
times.update(filtered_after=datetime.datetime.now())
next_batch = self.grouper.group_batch(next_batch)
times.update(grouped=datetime.datetime.now())
try:
self.writer.write_batch(batch=next_batch)
times.update(written=datetime.datetime.now())
last_position = self._get_last_position()
self.persistence.commit_position(last_position)
times.update(persisted=datetime.datetime.now())
except ItemsLimitReached:
# we have written some amount of records up to the limit
times.update(written=datetime.datetime.now())
self._iteration_stats_report(times)
raise
else:
self._iteration_stats_report(times)
def _get_last_position(self):
last_position = self.reader.get_last_position()
last_position['writer_metadata'] = self.writer.get_all_metadata()
return last_position
def _init_export_job(self):
self.notifiers.notify_start_dump(receivers=[CLIENTS, TEAM])
last_position = self.persistence.get_last_position()
if last_position is not None:
self.writer.update_metadata(last_position.get('writer_metadata'))
self.metadata.accurate_items_count = last_position.get('accurate_items_count', False)
self.reader.set_last_position(last_position)
def _clean_export_job(self):
try:
self.reader.close()
except:
raise
finally:
self.writer.close()
def _finish_export_job(self):
self.writer.finish_writing()
self.metadata.end_time = datetime.datetime.now()
[docs] def bypass_exporter(self, bypass_class):
self.logger.info('Executing bypass {}.'.format(bypass_class.__name__))
self.notifiers.notify_start_dump(receivers=[CLIENTS, TEAM])
if not self.config.exporter_options.get('resume'):
self.persistence.close()
self.persistence.delete()
with closing(bypass_class(self.config, self.metadata)) as bypass:
bypass.execute()
if not bypass.valid_total_count:
self.metadata.accurate_items_count = False
self.logger.warning('No accurate items count info can be retrieved')
self.writer.set_metadata(
'items_count', self.writer.get_metadata('items_count') + bypass.total_items)
self.logger.info(
'Finished executing bypass {}.'.format(bypass_class.__name__))
self._final_stats_report()
self.notifiers.notify_complete_dump(receivers=[CLIENTS, TEAM])
[docs] def bypass(self):
if self.config.prevent_bypass:
return False
for bypass_class in self.bypass_cases:
if bypass_class.meets_conditions(self.config):
try:
self.bypass_exporter(bypass_class)
return True
finally:
self._clean_export_job()
return False
def _handle_export_exception(self, exception):
self.logger.error(traceback.format_exc(exception))
self.logger.error(str(exception))
self.notifiers.notify_failed_job(
str(exception), str(traceback.format_exc(exception)), receivers=[TEAM])
def _iteration_stats_report(self, times):
try:
self.stats_manager.iteration_report(times)
except Exception as e:
import traceback
traceback.print_exc()
self.logger.error('Error making stats report: {}'.format(str(e)))
def _final_stats_report(self):
try:
self.stats_manager.final_report()
except Exception as e:
self.logger.error('Error making final stats report: {}'.format(str(e)))
def _run_pipeline(self):
while not self.reader.is_finished():
try:
self._run_pipeline_iteration()
except ItemsLimitReached as e:
self.logger.info('{!r}'.format(e))
break
self.writer.flush()
def _reader_thread(self):
self.logger.info('Starting reader thread')
while not self.reader.is_finished():
self.process_queue.put(list(self.reader.get_next_batch()))
qsize = self.process_queue.qsize()
if qsize > 0.5*self.queue_size:
# Queues are getting full, throttle the reader so the processor/writer can keep up
time.sleep((qsize*10.0 / self.queue_size) - 5)
self.reader_finished = True
def _process_thread(self):
self.logger.info('Starting processing thread')
while not self.reader_finished or not self.process_queue.empty():
next_batch = self.process_queue.get()
next_batch = self.filter_before.filter_batch(next_batch)
next_batch = self.transform.transform_batch(next_batch)
next_batch = self.filter_after.filter_batch(next_batch)
next_batch = self.grouper.group_batch(next_batch)
self.writer_queue.put(next_batch)
self.process_finished = True
def _writer_thread(self):
self.logger.info('Starting writer thread')
while not self.process_finished or not self.writer_queue.empty():
batch = self.writer_queue.get()
self.writer.write_batch(batch=batch)
self.writer.finish_writing()
self.writer.flush()
def _run_threads(self):
self.reader_finished = False
self.process_finished = False
self.process_queue = Queue(self.queue_size)
self.writer_queue = Queue(self.queue_size)
reader_thread = Thread(target=self._reader_thread)
process_thread = Thread(target=self._process_thread)
writer_thread = Thread(target=self._writer_thread)
reader_thread.start()
process_thread.start()
writer_thread.start()
reader_thread.join()
process_thread.join()
writer_thread.join()
[docs] def export(self):
if not self.bypass():
try:
self._init_export_job()
if self.threaded:
self._run_threads()
else:
self._run_pipeline()
self._finish_export_job()
self._final_stats_report()
self.persistence.close()
self.notifiers.notify_complete_dump(receivers=[CLIENTS, TEAM])
except Exception as e:
self._handle_export_exception(e)
raise
finally:
self._clean_export_job()
else:
self.metadata.bypassed_pipeline = True