exporters.persistence package

Submodules

exporters.persistence.alchemy_persistence module

class exporters.persistence.alchemy_persistence.BaseAlchemyPersistence(*args, **kwargs)[source]

Bases: exporters.persistence.base_persistence.BasePersistence

PROTOCOL = None
classmethod build_db_conn_uri(**kwargs)[source]

Build the database connection URI from the given keyword arguments

close()[source]
commit_position(last_position=None)[source]
classmethod configuration_from_uri(persistence_uri)[source]

Return a configuration object.

generate_new_job()[source]
get_last_position()[source]
classmethod parse_persistence_uri(persistence_uri)[source]

Parse a database URI and the persistence state ID from the given persistence URI

persistence_uri_re = '(?P<proto>[a-z]+)://(?P<user>.+):(?P<password>.+)@(?P<host>.+):(?P<port>\\d+)/(?P<database>.+)/(?P<job_id>\\d+)'
supported_options = {'database': {'type': (<type 'basestring'>,)}, 'host': {'type': (<type 'basestring'>,)}, 'user': {'type': (<type 'basestring'>,)}, 'password': {'type': (<type 'basestring'>,)}, 'port': {'type': (<type 'int'>, <type 'long'>)}}
class exporters.persistence.alchemy_persistence.Job(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

configuration
id
job_finished
last_committed
last_position
class exporters.persistence.alchemy_persistence.MysqlPersistence(*args, **kwargs)[source]

Bases: exporters.persistence.alchemy_persistence.BaseAlchemyPersistence

Manage export persistence using a mysql database as a backend. It will add a row for every job in a table called Jobs.

  • user (str)

Username with access to mysql database

  • password (str)

Password string

  • host (str)

DB server host ip

  • port (int)

DB server port

  • database (str)

Name of the database in which store jobs persistence

PROTOCOL = 'mysql'
supported_options = {'database': {'type': (<type 'basestring'>,)}, 'host': {'type': (<type 'basestring'>,)}, 'user': {'type': (<type 'basestring'>,)}, 'password': {'type': (<type 'basestring'>,)}, 'port': {'type': (<type 'int'>, <type 'long'>)}}
class exporters.persistence.alchemy_persistence.PostgresqlPersistence(*args, **kwargs)[source]

Bases: exporters.persistence.alchemy_persistence.BaseAlchemyPersistence

Manage export persistence using a postgresql database as a backend. It will add a row for every job in a table called Jobs.

  • user (str)

Username with access to postgresql database

  • password (str)

Password string

  • host (str)

DB server host ip

  • port (int)

DB server port

  • database (str)

Name of the database in which store jobs persistence

PROTOCOL = 'postgresql'
supported_options = {'database': {'type': (<type 'basestring'>,)}, 'host': {'type': (<type 'basestring'>,)}, 'user': {'type': (<type 'basestring'>,)}, 'password': {'type': (<type 'basestring'>,)}, 'port': {'type': (<type 'int'>, <type 'long'>)}}
class exporters.persistence.alchemy_persistence.SqlitePersistence(*args, **kwargs)[source]

Bases: exporters.persistence.alchemy_persistence.BaseAlchemyPersistence

Manage export persistence using a postgresql database as a backend. It will add a row for every job in a table called Jobs.

  • user (str)

Username with access to postgresql database

  • password (str)

Password string

  • host (str)

DB server host ip

  • port (int)

DB server port

  • database (str)

Name of the database in which store jobs persistence

PROTOCOL = 'postgresql'
classmethod build_db_conn_uri(**kwargs)[source]
persistence_uri_re = '(?P<proto>sqlite)://(?P<database>.+):(?P<job_id>\\d+)'
supported_options = {'database': {'type': (<type 'basestring'>,)}, 'host': {'default': '', 'type': (<type 'basestring'>,)}, 'user': {'default': '', 'type': (<type 'basestring'>,)}, 'password': {'default': '', 'type': (<type 'basestring'>,)}, 'port': {'default': '', 'type': <type 'int'>}}

exporters.persistence.base_persistence module

class exporters.persistence.base_persistence.BasePersistence(options, metadata)[source]

Bases: exporters.pipeline.base_pipeline_item.BasePipelineItem

Base module for persistence modules

close()[source]

Cleans tmp files, close remote connections...

commit_position(last_position)[source]

Commits a position that has been through all the pipeline. Position can be any serializable object. This support both usual position abstractions (number of batch) of specific abstractions such as offsets in Kafka (which are a dict).

static configuration_from_uri(uri, regex)[source]

returns a configuration object.

delete()[source]
generate_new_job()[source]

Creates and instantiates all that is needed to keep persistence (tmp files, remote connections...).

get_all_metadata(module='persistence')[source]
get_last_position()[source]

Returns the last commited position.

get_metadata(key, module='persistence')[source]
set_metadata(key, value, module='persistence')[source]
supported_options = {}
update_metadata(data, module='persistence')[source]

exporters.persistence.base_sqlalchemy_persistence module

exporters.persistence.persistence_config_dispatcher module

class exporters.persistence.persistence_config_dispatcher.PersistenceConfigDispatcher(uri)[source]

Bases: object

config
get_module_from_uri()[source]

exporters.persistence.pickle_persistence module

class exporters.persistence.pickle_persistence.PicklePersistence(*args, **kwargs)[source]

Bases: exporters.persistence.base_persistence.BasePersistence

Manages persistence using pickle module loading and dumping as a backend.

  • file_path (str)
    Path to store the pickle file
close()[source]
commit_position(last_position=None)[source]
static configuration_from_uri(uri, uri_regex)[source]

returns a configuration object.

delete()[source]
generate_new_job()[source]
get_last_position()[source]
supported_options = {'file_path': {'default': '.', 'type': (<type 'basestring'>,)}}
uri_regex = 'pickle:(([a-zA-Z\\d-]|\\/)+)'

Module contents