import datetime
import json
import re
import six
import yaml
from sqlalchemy import Boolean, Column, DateTime, Integer, Text, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from exporters.persistence.base_persistence import BasePersistence
Base = declarative_base()
[docs]class Job(Base):
__tablename__ = 'job'
id = Column(Integer, primary_key=True)
last_position = Column(Text, nullable=False)
last_committed = Column(DateTime)
job_finished = Column(Boolean)
configuration = Column(Text, nullable=False)
[docs]class BaseAlchemyPersistence(BasePersistence):
supported_options = {
'user': {'type': six.string_types},
'password': {'type': six.string_types},
'host': {'type': six.string_types},
'port': {'type': six.integer_types},
'database': {'type': six.string_types}
}
PROTOCOL = None
# example: mysql://username:password@host:port/database/JOB_ID
persistence_uri_re = (r'(?P<proto>[a-z]+)://(?P<user>.+):(?P<password>.+)@'
r'(?P<host>.+):(?P<port>\d+)/(?P<database>.+)/(?P<job_id>\d+)')
def __init__(self, *args, **kwargs):
self.engine = None
super(BaseAlchemyPersistence, self).__init__(*args, **kwargs)
def _db_init(self):
db_uri = self.build_db_conn_uri(
proto=self.PROTOCOL,
user=self.read_option('user'),
password=self.read_option('password'),
host=self.read_option('host'),
port=self.read_option('port'),
database=self.read_option('database'),
)
self.engine = create_engine(db_uri)
Base.metadata.create_all(self.engine)
Base.metadata.bind = self.engine
DBSession = sessionmaker(bind=self.engine)
self.session = DBSession()
[docs] def get_last_position(self):
if not self.engine:
self._db_init()
job = self.session.query(Job).filter(Job.id == self.persistence_state_id).first()
return json.loads(job.last_position)
[docs] def commit_position(self, last_position=None):
self.last_position = last_position
self.session.query(Job).filter(Job.id == self.persistence_state_id).update(
{"last_position": json.dumps(self.last_position),
"last_committed": datetime.datetime.now()}, synchronize_session='fetch')
self.session.commit()
self.logger.debug('Commited batch number ' + str(self.last_position) +
' of job: ' + str(self.persistence_state_id))
self.set_metadata('commited_positions',
self.get_metadata('commited_positions') + 1)
[docs] def generate_new_job(self):
if not self.engine:
self._db_init()
new_job = Job(last_position='None', configuration=json.dumps(self.configuration))
self.session.add(new_job)
self.session.commit()
self.persistence_state_id = new_job.id
self.logger.debug(
'Created persistence job with id {} in database {}. Using protocol {}.{}'.format(
new_job.id, self.read_option('database'), self.PROTOCOL, str(new_job.id)))
return new_job.id
[docs] def close(self):
self.session.query(Job).filter(Job.id == self.persistence_state_id).update(
dict(job_finished=True, last_committed=datetime.datetime.now()),
synchronize_session='fetch'
)
self.session.commit()
self.session.close()
@classmethod
[docs] def build_db_conn_uri(cls, **kwargs):
"""Build the database connection URI from the given keyword arguments
"""
return '{proto}://{user}:{password}@{host}:{port}/{database}'.format(**kwargs)
@classmethod
[docs] def parse_persistence_uri(cls, persistence_uri):
"""Parse a database URI and the persistence state ID from
the given persistence URI
"""
regex = cls.persistence_uri_re
match = re.match(regex, persistence_uri)
if not match:
raise ValueError("Couldn't parse persistence URI: %s -- regex: %s)"
% (persistence_uri, regex))
conn_params = match.groupdict()
missing = {'proto', 'job_id', 'database'} - set(conn_params)
if missing:
raise ValueError('Missing required parameters: %s (given params: %s)'
% (tuple(missing), conn_params))
persistence_state_id = int(conn_params.pop('job_id'))
db_uri = cls.build_db_conn_uri(**conn_params)
return db_uri, persistence_state_id
@classmethod
[docs] def configuration_from_uri(cls, persistence_uri):
"""
Return a configuration object.
"""
db_uri, persistence_state_id = cls.parse_persistence_uri(persistence_uri)
engine = create_engine(db_uri)
Base.metadata.create_all(engine)
Base.metadata.bind = engine
DBSession = sessionmaker(bind=engine)
session = DBSession()
job = session.query(Job).filter(Job.id == persistence_state_id).first()
configuration = job.configuration
configuration = yaml.safe_load(configuration)
configuration['exporter_options']['resume'] = True
configuration['exporter_options']['persistence_state_id'] = persistence_state_id
return configuration
_docstring = """
Manage export persistence using a {protocol} database as a backend.
It will add a row for every job in a table called Jobs.
- user (str)
Username with access to {protocol} database
- password (str)
Password string
- host (str)
DB server host ip
- port (int)
DB server port
- database (str)
Name of the database in which store jobs persistence
"""
[docs]class MysqlPersistence(BaseAlchemyPersistence):
PROTOCOL = 'mysql'
__doc__ = _docstring.format(protocol=PROTOCOL)
[docs]class PostgresqlPersistence(BaseAlchemyPersistence):
PROTOCOL = 'postgresql'
__doc__ = _docstring.format(protocol=PROTOCOL)
[docs]class SqlitePersistence(BaseAlchemyPersistence):
PROTOCOL = 'postgresql'
__doc__ = _docstring.format(protocol=PROTOCOL)
# sqlite://path/to/file.db:JOB_ID
persistence_uri_re = '(?P<proto>sqlite)://(?P<database>.+):(?P<job_id>\d+)'
supported_options = {
# set defaults for unneeded options
'user': {'type': six.string_types, 'default': ''},
'password': {'type': six.string_types, 'default': ''},
'host': {'type': six.string_types, 'default': ''},
'port': {'type': int, 'default': ''},
}
@classmethod
[docs] def build_db_conn_uri(self, **kwargs):
return 'sqlite+pysqlite:///%s' % kwargs.pop('database')