Modules¶
Every module has a supported_options attribute that defines which options are optional or mandatory, and the default values if proceeds. It is a dict with the following shape:
Possible attributes for a supported_option are:
- type - The option type. In addition to the standard python types (
basestring
,int
,list
,dict
...], exporters provide homogeneous list types inexporters.utils
calledstr_list
,int_list
anddict_list
, This types indicate that every member of the list needs to be able to be be casted to a string, integer or dictionary respectively.- default - Default option value if it is not provided by configuration object. If it is present,
the supported_option will be optional instead of mandatory. - env_fallback - If option is not provided by configuration object, it will be loaded from env_fallback environment variable.
Export Manager¶
This module handles the pipeline iteration. A pipeline iteration usually consists on calling the reader to get a batch, filter it, transform it, filter it again, write it and commit the read batch. It should also be in charge of notifications and retries management.
Provided exporters¶
BasicExporter¶
-
class
exporters.export_managers.basic_exporter.
BasicExporter
(configuration)[source]¶ Bases:
exporters.export_managers.base_exporter.BaseExporter
Basic export manager reading configuration a json file. It has bypass support.
Bypass support¶
Exporters architecture provides support for bypassing the pipeline. One example would be in which both reader and writer aim S3 buckets. If no transforms or filtering are needed, keys can be copied directly without downloading them.
All bypass classes are subclasses of BaseBypass class, and must implement two methods:
- meets_conditions(configuration)
- Checks if provided export configuration meets the requirements to use the bypass. If not, it returns False.
- execute()
- Executes the bypass script.
- close()
- Perform all needed actions to leave a clean system after the bypass execution.
Provided Bypass scripts¶
S3Bypass¶
-
exception
exporters.bypasses.s3_to_s3_bypass.
InvalidKeyIntegrityCheck
[source]¶ Bases:
exceptions.Exception
Exception thrown when two s3 keys have different md5 checksums
-
class
exporters.bypasses.s3_to_s3_bypass.
S3Bypass
(config, metadata)[source]¶ Bases:
exporters.bypasses.base_s3_bypass.BaseS3Bypass
Bypass executed by default when data source and data destination are S3 buckets. It should be transparent to user. Conditions are:
- S3Reader and S3Writer are used on configuration.
- No filter modules are set up.
- No transform module is set up.
- No grouper module is set up.
- S3 Writer has not a items_limit set in configuration.
- S3 Writer has default items_per_buffer_write and size_per_buffer_write per default.
- S3 Writer has default write_buffer.
This bypass tries to directly copy the S3 keys between the read and write buckets. If is is not possible due to permission issues, it will download the key from the read bucket and directly upload it to the write bucket.
S3ToAzureBlobBypass¶
-
class
exporters.bypasses.s3_to_azure_blob_bypass.
S3AzureBlobBypass
(config, metadata)[source]¶ Bases:
exporters.bypasses.base_s3_bypass.BaseS3Bypass
Bypass executed by default when data source is an S3 bucket and data destination is an Azure blob container. It should be transparent to user. Conditions are:
- S3Reader and AzureBlobWriter are used on configuration.
- No filter modules are set up.
- No transform module is set up.
- No grouper module is set up.
- AzureBlobWriter has not a items_limit set in configuration.
- AzureBlobWriter has default items_per_buffer_write and size_per_buffer_write per default.
- AzureBlobWriter has default write_buffer.
S3ToAzureFileBypass¶
-
class
exporters.bypasses.s3_to_azure_file_bypass.
S3AzureFileBypass
(config, metadata)[source]¶ Bases:
exporters.bypasses.base_s3_bypass.BaseS3Bypass
Bypass executed by default when data source is an S3 bucket and data destination is an Azure share. It should be transparent to user. Conditions are:
- S3Reader and AzureFileWriter are used on configuration.
- No filter modules are set up.
- No transform module is set up.
- No grouper module is set up.
- AzureFileWriter has not a items_limit set in configuration.
- AzureFileWriter has default items_per_buffer_write and size_per_buffer_write per default.
- AzureFileWriter has default write_buffer.
StreamBypass¶
-
class
exporters.bypasses.stream_bypass.
Stream
(filename, size, meta)¶ Bases:
tuple
-
filename
¶ Alias for field number 0
-
meta
¶ Alias for field number 2
-
size
¶ Alias for field number 1
-
-
class
exporters.bypasses.stream_bypass.
StreamBypass
(config, metadata)[source]¶ Bases:
exporters.bypasses.base.BaseBypass
Generic Bypass that streams the contents of the files instead of running them through the export pipeline.
It should be transparent to user. Conditions are:
- Reader module supports get_read_streams
- Writer module supports write_stream
- No filter modules are set up.
- No transform module is set up.
- No grouper module is set up.
- writer has no option items_limit set in configuration.
- writer has default items_per_buffer_write and size_per_buffer_write per default.
- writer has default write_buffer.
Reader¶
There are two kinds of readers, stream readers and structured readers. Stream readers read a stream of data from a binary backend, while in structured readers the underlying storage stores structured items.
Stream readers have to be combined with a decompressor and a deserializer to read the items from them (by default, ZLibDecompressor and JsonDeserializer are used).
Readers are in charge of providing batches of items to the pipeline. All readers are subclasses of BaseReader class.
- Structured readers must implement:
- get_next_batch()
- This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.
- Stream readers must implement:
- get_read_streams()
- This should be an iterator that yields streams (file-like objects).
-
class
exporters.readers.base_reader.
BaseReader
(options, metadata)[source]¶ Bases:
exporters.pipeline.base_pipeline_item.BasePipelineItem
This module reads and creates a batch to pass them to the pipeline
-
get_next_batch
()[source]¶ This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.
-
set_last_position
(last_position)[source]¶ Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support
-
supported_options
= {}¶
-
-
class
exporters.readers.base_stream_reader.
StreamBasedReader
(*args, **kwargs)[source]¶ Bases:
exporters.readers.base_reader.BaseReader
Abstract readers for storage backends that operate in bytes instead of json objects.
The bytes are first decompressed using a decompressor and then deserialized using a deserializer.
- Avaliable Options:
- batch_size (int)
- Number of items to be returned in each batch
-
decompressor
= <exporters.decompressors.ZLibDecompressor object>¶
-
deserializer
= <exporters.deserializers.JsonLinesDeserializer object>¶
-
get_next_batch
()[source]¶ This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.
-
set_last_position
(last_position)[source]¶ Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support
-
supported_options
= {'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}}¶
Provided readers¶
RandomReader¶
Random items generator, just for testing purposes
-
class
exporters.readers.random_reader.
RandomReader
(*args, **kwargs)[source]¶ Bases:
exporters.readers.base_reader.BaseReader
It is just a reader with testing purposes. It generates random data in a quantity that is set in its config section.
- number_of_items (int)
- Number of total items that must be returned by the reader before finishing.
- batch_size (int)
- Number of items to be returned in each batch.
-
get_next_batch
()[source]¶ This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.
-
set_last_position
(last_position)[source]¶ Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support
-
supported_options
= {'batch_size': {'default': 100, 'type': (<type 'int'>, <type 'long'>)}, 'number_of_items': {'default': 1000, 'type': (<type 'int'>, <type 'long'>)}}¶
FSReader (Stream)¶
-
class
exporters.readers.fs_reader.
FSReader
(*args, **kwargs)[source]¶ Bases:
exporters.readers.base_stream_reader.StreamBasedReader
Reads items from files located in filesystem and compressed with gzip with a common path.
- input (str/dict or a list of str/dict)
Specification of files to be read.
Accepts either one “input_unit” or many of them in a list. “input_unit” is defined as follows:
If a string, it indicates a filename, e.g. “/path/to/filename”.
If a dictionary, it indicates a directory to be read with the following elements:
“dir”: path to directory, e.g. “/path/to/dir”.
“dir_pointer”: path to file containing path to directory, e.g. “/path/to/pointer/file” which contains “/path/to/dir”. Cannot be used together with “dir”.
For example:
We have a weekly export set with CRON. If we wanted to point to a new data path every week, we should keep updating the export configuration. With a pointer, we can set the reader to read from that file, which contains one line with a valid path to datasets, so only that pointer file should be updated.
“pattern”: (optional) regular expression to filter filenames, e.g. “output.*.jl.gz$”
-
supported_options
= {'input': {'default': {'dir': ''}, 'type': (<type 'str'>, <type 'dict'>, <type 'list'>)}, 'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}}¶
KafkaScannerReader¶
Kafka reader
-
class
exporters.readers.kafka_scanner_reader.
KafkaScannerReader
(*args, **kwargs)[source]¶ Bases:
exporters.readers.base_reader.BaseReader
This reader retrieves items from kafka brokers.
- batch_size (int)
- Number of items to be returned in each batch
- brokers (list)
- List of brokers uris.
- topic (str)
- Topic to read from.
- partitions (list)
- Partitions to read from.
-
get_next_batch
()[source]¶ This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.
-
set_last_position
(last_position)[source]¶ Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support
-
supported_options
= {'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'topic': {'type': (<type 'basestring'>,)}, 'ssl_configs': {'default': None, 'type': <type 'dict'>}, 'brokers': {'type': <class 'exporters.utils.list[unicode]'>}, 'partitions': {'default': None, 'type': <class 'exporters.utils.list[int]'>}}¶
KafkaRandomReader¶
Kafka random reader
-
class
exporters.readers.kafka_random_reader.
KafkaRandomReader
(*args, **kwargs)[source]¶ Bases:
exporters.readers.base_reader.BaseReader
This reader retrieves a random subset of items from kafka brokers.
- record_count (int)
- Number of items to be returned in total
- batch_size (int)
- Number of items to be returned in each batch
- brokers (list)
- List of brokers uris.
- topic (str)
- Topic to read from.
- group (str)
- Reading group for kafka client.
-
decompress_messages
(offmsgs)[source]¶ Decompress pre-defined compressed fields for each message. Msgs should be unpacked before this step.
-
get_next_batch
()[source]¶ This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.
-
set_last_position
(last_position)[source]¶ Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support
-
supported_options
= {'record_count': {'type': (<type 'int'>, <type 'long'>)}, 'group': {'type': (<type 'basestring'>,)}, 'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'topic': {'type': (<type 'basestring'>,)}, 'brokers': {'type': <class 'exporters.utils.list[unicode]'>}}¶
S3Reader (Stream)¶
-
class
exporters.readers.s3_reader.
S3BucketKeysFetcher
(reader_options, aws_access_key_id, aws_secret_access_key)[source]¶ Bases:
object
-
class
exporters.readers.s3_reader.
S3Reader
(*args, **kwargs)[source]¶ Bases:
exporters.readers.base_stream_reader.StreamBasedReader
Reads items from keys located in S3 buckets and compressed with gzip with a common path.
- bucket (str)
Name of the bucket to retrieve items from.
- aws_access_key_id (str)
Public access key to the s3 bucket.
- aws_secret_access_key (str)
Secret access key to the s3 bucket.
- prefix (str)
Prefix of s3 keys to be read.
- prefix_pointer (str)
Prefix pointing to the last version of dataset. This adds support for regular exports. For example:
We have a weekly export set with CRON. If we wanted to point to a new data prefix every week, we should keep updating the export configuration. With a pointer, we can set the reader to read from that key, which contains one or several lines with valid prefixes to datasets, so only that pointer file should be updated.
- pattern (str)
S3 key name pattern (REGEX). All files that don’t match this regex string will be discarded by the reader.
-
supported_options
= {'pattern': {'default': None, 'type': (<type 'basestring'>,)}, 'bucket': {'type': (<type 'basestring'>,)}, 'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'prefix': {'default': '', 'type': (<type 'basestring'>, <type 'list'>)}, 'prefix_format_using_date': {'default': None, 'type': (<type 'basestring'>, <type 'tuple'>, <type 'list'>)}, 'aws_secret_access_key': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_S3READER_AWS_SECRET'}, 'prefix_pointer': {'default': None, 'type': (<type 'basestring'>,)}, 'aws_access_key_id': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_S3READER_AWS_KEY'}}¶
HubstorageReader¶
-
class
exporters.readers.hubstorage_reader.
HubstorageReader
(*args, **kwargs)[source]¶ Bases:
exporters.readers.base_reader.BaseReader
This reader retrieves items from Scrapinghub Hubstorage collections.
- batch_size (int)
- Number of items to be returned in each batch
- apikey (str)
- API key with access to the project where the items are being generated.
- project_id (int or str)
- Id of the project.
- collection_name (str)
- Name of the collection of items.
- count (int)
- Number of records to read from collection.
- prefixes (list)
- Only include records with given key prefixes.
- exclude_prefixes (list)
- Exclude records with given key prefixes.
- secondary_collections (list)
- A list of secondary collections to merge from.
- startts (int or str)
- Either milliseconds since epoch, or date string.
- endts (int or str)
- Either milliseconds since epoch, or date string.
-
get_next_batch
()[source]¶ This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.
-
set_last_position
(last_position)[source]¶ Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support
-
supported_options
= {'count': {'default': 0, 'type': (<type 'int'>, <type 'long'>)}, 'secondary_collections': {'default': [], 'type': <class 'exporters.utils.list[unicode]'>}, 'apikey': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_HS_APIKEY'}, 'collection_name': {'type': (<type 'basestring'>,)}, 'startts': {'default': None, 'type': (<type 'int'>, <type 'long'>, <type 'basestring'>)}, 'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'prefixes': {'default': [], 'type': <class 'exporters.utils.list[unicode]'>}, 'exclude_prefixes': {'default': [], 'type': <class 'exporters.utils.list[unicode]'>}, 'endts': {'default': None, 'type': (<type 'int'>, <type 'long'>, <type 'basestring'>)}, 'project_id': {'type': (<type 'int'>, <type 'long'>, <type 'basestring'>)}}¶
Writer¶
Writers are in charge of writing batches of items to final destination. All writers are subclasses of BaseWriter class, and must implement:
- write(dump_path, group_key=None)
- The manager calls this method. It consumes a dump_path, which is the path of an items buffer file compressed with gzip. It also has an optional group_key, which provides information regarding the group membership of the items contained in that file.
All writers have also the following common options:
- items_per_buffer_write
- Number of items to be written before a buffer flush takes place.
- size_per_buffer_write
- Size of buffer files before being flushed.
- items_limit
- Number of items to be written before ending the export process. This is useful for testing exports.
-
class
exporters.writers.base_writer.
BaseWriter
(options, metadata, *args, **kwargs)[source]¶ Bases:
exporters.pipeline.base_pipeline_item.BasePipelineItem
This module receives a batch and writes it where needed.
-
finish_writing
()[source]¶ This method is hook for operations to be done after everything has been written (e.g. consistency checks, write a checkpoint, etc).
The default implementation calls self._check_write_consistency if option check_consistency is True.
-
grouping_info
¶
-
hash_algorithm
= None¶
-
supported_options
= {'write_buffer': {'default': 'exporters.write_buffers.base.WriteBuffer', 'type': (<type 'basestring'>,)}, 'compression': {'default': 'gz', 'type': (<type 'basestring'>,)}, 'items_per_buffer_write': {'default': 500000, 'type': (<type 'int'>, <type 'long'>)}, 'size_per_buffer_write': {'default': 4000000000, 'type': (<type 'int'>, <type 'long'>)}, 'items_limit': {'default': 0, 'type': (<type 'int'>, <type 'long'>)}, 'check_consistency': {'default': False, 'type': <type 'bool'>}, 'write_buffer_options': {'default': {}, 'type': <type 'dict'>}}¶
-
-
exception
exporters.writers.base_writer.
InconsistentWriteState
[source]¶ Bases:
exceptions.Exception
This exception is thrown when write state is inconsistent with expected final state
-
exception
exporters.writers.base_writer.
ItemsLimitReached
[source]¶ Bases:
exceptions.Exception
This exception is thrown when the desired items number has been reached
Provided writers¶
ConsoleWriter¶
-
class
exporters.writers.console_writer.
ConsoleWriter
(options, *args, **kwargs)[source]¶ Bases:
exporters.writers.base_writer.BaseWriter
It is just a writer with testing purposes. It prints every item in console.
It has no other options.
-
supported_options
= {'write_buffer': {'default': 'exporters.write_buffers.base.WriteBuffer', 'type': (<type 'basestring'>,)}, 'compression': {'default': 'gz', 'type': (<type 'basestring'>,)}, 'items_per_buffer_write': {'default': 500000, 'type': (<type 'int'>, <type 'long'>)}, 'size_per_buffer_write': {'default': 4000000000, 'type': (<type 'int'>, <type 'long'>)}, 'items_limit': {'default': 0, 'type': (<type 'int'>, <type 'long'>)}, 'check_consistency': {'default': False, 'type': <type 'bool'>}, 'write_buffer_options': {'default': {}, 'type': <type 'dict'>}}¶
-
S3Writer¶
-
class
exporters.writers.s3_writer.
S3Writer
(options, *args, **kwargs)[source]¶ Bases:
exporters.writers.filebase_base_writer.FilebaseBaseWriter
Writes items to S3 bucket. It is a File Based writer, so it has filebase option available
- bucket (str)
- Name of the bucket to write the items to.
- aws_access_key_id (str)
- Public acces key to the s3 bucket.
- aws_secret_access_key (str)
- Secret access key to the s3 bucket.
- filebase (str)
- Base path to store the items in the bucket.
- aws_region (str)
- AWS region to connect to.
- save_metadata (bool)
- Save key’s items count as metadata. Default: True
- filebase
- Path to store the exported files
-
supported_options
= {'filebase': {'type': (<type 'basestring'>,)}, 'write_buffer': {'default': 'exporters.write_buffers.base.WriteBuffer', 'type': (<type 'basestring'>,)}, 'aws_region': {'default': None, 'type': (<type 'basestring'>,)}, 'save_metadata': {'default': True, 'required': False, 'type': <type 'bool'>}, 'host': {'default': None, 'type': (<type 'basestring'>,)}, 'items_limit': {'default': 0, 'type': (<type 'int'>, <type 'long'>)}, 'aws_access_key_id': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_S3WRITER_AWS_LOGIN'}, 'start_file_count': {'default': 0, 'type': <type 'int'>}, 'compression': {'default': 'gz', 'type': (<type 'basestring'>,)}, 'items_per_buffer_write': {'default': 500000, 'type': (<type 'int'>, <type 'long'>)}, 'bucket': {'type': (<type 'basestring'>,)}, 'generate_md5': {'default': False, 'type': <type 'bool'>}, 'size_per_buffer_write': {'default': 4000000000, 'type': (<type 'int'>, <type 'long'>)}, 'aws_secret_access_key': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_S3WRITER_AWS_SECRET'}, 'save_pointer': {'default': None, 'type': (<type 'basestring'>,)}, 'check_consistency': {'default': False, 'type': <type 'bool'>}, 'write_buffer_options': {'default': {}, 'type': <type 'dict'>}}¶
FTPWriter¶
-
class
exporters.writers.ftp_writer.
FTPWriter
(options, *args, **kwargs)[source]¶ Bases:
exporters.writers.filebase_base_writer.FilebaseBaseWriter
Writes items to FTP server. It is a File Based writer, so it has filebase option available
- host (str)
- Ftp server ip
- port (int)
- Ftp port
- ftp_user (str)
- Ftp user
- ftp_password (str)
- Ftp password
- filebase (str)
- Path to store the exported files
-
supported_options
= {'filebase': {'type': (<type 'basestring'>,)}, 'write_buffer': {'default': 'exporters.write_buffers.base.WriteBuffer', 'type': (<type 'basestring'>,)}, 'host': {'type': (<type 'basestring'>,)}, 'items_limit': {'default': 0, 'type': (<type 'int'>, <type 'long'>)}, 'port': {'default': 21, 'type': (<type 'int'>, <type 'long'>)}, 'start_file_count': {'default': 0, 'type': <type 'int'>}, 'ftp_user': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_FTP_USER'}, 'compression': {'default': 'gz', 'type': (<type 'basestring'>,)}, 'items_per_buffer_write': {'default': 500000, 'type': (<type 'int'>, <type 'long'>)}, 'ftp_password': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_FTP_PASSWORD'}, 'generate_md5': {'default': False, 'type': <type 'bool'>}, 'size_per_buffer_write': {'default': 4000000000, 'type': (<type 'int'>, <type 'long'>)}, 'check_consistency': {'default': False, 'type': <type 'bool'>}, 'write_buffer_options': {'default': {}, 'type': <type 'dict'>}}¶
SFTPWriter¶
-
class
exporters.writers.sftp_writer.
SFTPWriter
(options, *args, **kwargs)[source]¶ Bases:
exporters.writers.filebase_base_writer.FilebaseBaseWriter
Writes items to SFTP server. It is a File Based writer, so it has filebase option available
- host (str)
- SFtp server ip
- port (int)
- SFtp port
- sftp_user (str)
- SFtp user
- sftp_password (str)
- SFtp password
- filebase (str)
- Path to store the exported files
-
supported_options
= {'filebase': {'type': (<type 'basestring'>,)}, 'write_buffer': {'default': 'exporters.write_buffers.base.WriteBuffer', 'type': (<type 'basestring'>,)}, 'host': {'type': (<type 'basestring'>,)}, 'items_limit': {'default': 0, 'type': (<type 'int'>, <type 'long'>)}, 'port': {'default': 22, 'type': (<type 'int'>, <type 'long'>)}, 'start_file_count': {'default': 0, 'type': <type 'int'>}, 'sftp_password': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_SFTP_PASSWORD'}, 'compression': {'default': 'gz', 'type': (<type 'basestring'>,)}, 'sftp_user': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_SFTP_USER'}, 'items_per_buffer_write': {'default': 500000, 'type': (<type 'int'>, <type 'long'>)}, 'generate_md5': {'default': False, 'type': <type 'bool'>}, 'size_per_buffer_write': {'default': 4000000000, 'type': (<type 'int'>, <type 'long'>)}, 'check_consistency': {'default': False, 'type': <type 'bool'>}, 'write_buffer_options': {'default': {}, 'type': <type 'dict'>}}¶
FSWriter¶
-
class
exporters.writers.fs_writer.
FSWriter
(options, *args, **kwargs)[source]¶ Bases:
exporters.writers.filebase_base_writer.FilebaseBaseWriter
Writes items to local file system files. It is a File Based writer, so it has filebase option available
- filebase (str)
- Path to store the exported files
-
supported_options
= {'filebase': {'type': (<type 'basestring'>,)}, 'write_buffer': {'default': 'exporters.write_buffers.base.WriteBuffer', 'type': (<type 'basestring'>,)}, 'items_limit': {'default': 0, 'type': (<type 'int'>, <type 'long'>)}, 'start_file_count': {'default': 0, 'type': <type 'int'>}, 'compression': {'default': 'gz', 'type': (<type 'basestring'>,)}, 'items_per_buffer_write': {'default': 500000, 'type': (<type 'int'>, <type 'long'>)}, 'generate_md5': {'default': False, 'type': <type 'bool'>}, 'size_per_buffer_write': {'default': 4000000000, 'type': (<type 'int'>, <type 'long'>)}, 'check_consistency': {'default': False, 'type': <type 'bool'>}, 'write_buffer_options': {'default': {}, 'type': <type 'dict'>}}¶
MailWriter¶
-
class
exporters.writers.mail_writer.
MailWriter
(options, *args, **kwargs)[source]¶ Bases:
exporters.writers.base_writer.BaseWriter
Send emails with items files attached
- email (str)
- Email address where data will be sent
- subject (str)
- Subject of the email
- from (str)
- Sender of the email
- max_mails_sent (str)
- maximum amount of emails that will be sent
-
supported_options
= {'access_key': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_MAIL_AWS_ACCESS_KEY'}, 'write_buffer': {'default': 'exporters.write_buffers.base.WriteBuffer', 'type': (<type 'basestring'>,)}, 'from': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_MAIL_FROM'}, 'compression': {'default': 'gz', 'type': (<type 'basestring'>,)}, 'items_per_buffer_write': {'default': 500000, 'type': (<type 'int'>, <type 'long'>)}, 'file_name': {'default': None, 'type': (<type 'basestring'>,)}, 'max_mails_sent': {'default': 5, 'type': (<type 'int'>, <type 'long'>)}, 'emails': {'type': <class 'exporters.utils.list[unicode]'>}, 'size_per_buffer_write': {'default': 4000000000, 'type': (<type 'int'>, <type 'long'>)}, 'secret_key': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_MAIL_AWS_SECRET_KEY'}, 'items_limit': {'default': 0, 'type': (<type 'int'>, <type 'long'>)}, 'check_consistency': {'default': False, 'type': <type 'bool'>}, 'write_buffer_options': {'default': {}, 'type': <type 'dict'>}, 'subject': {'type': (<type 'basestring'>,)}}¶
AggregationWriter¶
CloudsearchWriter¶
-
class
exporters.writers.cloudsearch_writer.
CloudSearchWriter
(options, *args, **kwargs)[source]¶ Bases:
exporters.writers.base_writer.BaseWriter
This writer stores items in CloudSearch Amazon Web Services service (https://aws.amazon.com/es/cloudsearch/)
- endpoint_url
- Document Endpoint (e.g.: http://doc-movies-123456789012.us-east-1.cloudsearch.amazonaws.com)
- id_field
- Field to use as identifier
- access_key
- Public acces key to the s3 bucket.
- secret_key
- Secret access key to the s3 bucket.
-
supported_options
= {'access_key': {'default': None, 'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_CLOUDSEARCH_ACCESS_KEY'}, 'write_buffer': {'default': 'exporters.write_buffers.base.WriteBuffer', 'type': (<type 'basestring'>,)}, 'compression': {'default': 'gz', 'type': (<type 'basestring'>,)}, 'items_per_buffer_write': {'default': 500000, 'type': (<type 'int'>, <type 'long'>)}, 'id_field': {'default': '_key', 'type': (<type 'basestring'>,), 'help': 'Field to use as identifier'}, 'endpoint_url': {'type': (<type 'basestring'>,), 'help': 'Document Endpoint (e.g.: http://doc-movies-123456789012.us-east-1.cloudsearch.amazonaws.com)'}, 'size_per_buffer_write': {'default': 4000000000, 'type': (<type 'int'>, <type 'long'>)}, 'secret_key': {'default': None, 'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_CLOUDSEARCH_SECRET_KEY'}, 'items_limit': {'default': 0, 'type': (<type 'int'>, <type 'long'>)}, 'check_consistency': {'default': False, 'type': <type 'bool'>}, 'write_buffer_options': {'default': {}, 'type': <type 'dict'>}}¶
-
exporters.writers.cloudsearch_writer.
create_document_batches
(jsonlines, id_field, max_batch_size=5000000)[source]¶ Create batches in expected AWS Cloudsearch format, limiting the byte size per batch according to given max_batch_size
See: http://docs.aws.amazon.com/cloudsearch/latest/developerguide/preparing-data.html
GDriveWriter¶
-
class
exporters.writers.gdrive_writer.
GDriveWriter
(*args, **kwargs)[source]¶ Bases:
exporters.writers.filebase_base_writer.FilebaseBaseWriter
Writes items to Google Drive account. It is a File Based writer, so it has filebase
- client_secret (object)
- JSON object containing client secrets (client-secret.json) file obtained when creating the google drive API key.
- credentials (object)
- JSON object containing credentials, obtained by authenticating the application using the bin/get_gdrive_credentials.py ds script
- filebase (str)
- Path to store the exported files
-
supported_options
= {'filebase': {'type': (<type 'basestring'>,)}, 'write_buffer': {'default': 'exporters.write_buffers.base.WriteBuffer', 'type': (<type 'basestring'>,)}, 'credentials': {'type': <type 'object'>}, 'items_limit': {'default': 0, 'type': (<type 'int'>, <type 'long'>)}, 'start_file_count': {'default': 0, 'type': <type 'int'>}, 'compression': {'default': 'gz', 'type': (<type 'basestring'>,)}, 'items_per_buffer_write': {'default': 500000, 'type': (<type 'int'>, <type 'long'>)}, 'generate_md5': {'default': False, 'type': <type 'bool'>}, 'size_per_buffer_write': {'default': 4000000000, 'type': (<type 'int'>, <type 'long'>)}, 'client_secret': {'type': <type 'object'>}, 'check_consistency': {'default': False, 'type': <type 'bool'>}, 'write_buffer_options': {'default': {}, 'type': <type 'dict'>}}¶
GStorageWriter¶
-
class
exporters.writers.gstorage_writer.
GStorageWriter
(options, *args, **kwargs)[source]¶ Bases:
exporters.writers.filebase_base_writer.FilebaseBaseWriter
Writes items to Google Storage buckets. It is a File Based writer, so it has filebase option available
- filebase (str)
- Path to store the exported files
- project (str)
- Valid project name
- bucket (str)
- Google Storage bucket name
- credentials (str or dict)
- Object with valid Google credentials, could be set using env variable EXPORTERS_GSTORAGE_CREDS_RESOURCE which should include reference to credentials JSON file installed with setuptools. This reference should have form “package_name:file_path”
-
supported_options
= {'filebase': {'type': (<type 'basestring'>,)}, 'write_buffer': {'default': 'exporters.write_buffers.base.WriteBuffer', 'type': (<type 'basestring'>,)}, 'credentials': {'type': (<type 'dict'>, <type 'basestring'>), 'env_fallback': 'EXPORTERS_GSTORAGE_CREDS_RESOURCE'}, 'items_limit': {'default': 0, 'type': (<type 'int'>, <type 'long'>)}, 'start_file_count': {'default': 0, 'type': <type 'int'>}, 'project': {'type': (<type 'basestring'>,)}, 'compression': {'default': 'gz', 'type': (<type 'basestring'>,)}, 'items_per_buffer_write': {'default': 500000, 'type': (<type 'int'>, <type 'long'>)}, 'bucket': {'type': (<type 'basestring'>,)}, 'generate_md5': {'default': False, 'type': <type 'bool'>}, 'size_per_buffer_write': {'default': 4000000000, 'type': (<type 'int'>, <type 'long'>)}, 'check_consistency': {'default': False, 'type': <type 'bool'>}, 'write_buffer_options': {'default': {}, 'type': <type 'dict'>}}¶
HubstorageReduceWriter¶
-
class
exporters.writers.hs_reduce_writer.
HubstorageReduceWriter
(*args, **kwargs)[source]¶ Bases:
exporters.writers.reduce_writer.ReduceWriter
This writer allow exporters to make aggregation of items data and push results into Scrapinghub Hubstorage collections
- code (str)
- Python code defining a reduce_function(item, accumulator=None)
- collection_url (str)
- Hubstorage Collection URL
- key (str)
- Element key where to push the accumulated result
- apikey (dict)
- Hubstorage API key
-
supported_options
= {'write_buffer': {'default': 'exporters.write_buffers.base.WriteBuffer', 'type': (<type 'basestring'>,)}, 'code': {'type': (<type 'basestring'>,), 'help': 'Python code defining a reduce_function(item, accumulator=None)'}, 'key': {'type': (<type 'basestring'>,), 'help': 'Element key where to push the accumulated result'}, 'items_limit': {'default': 0, 'type': (<type 'int'>, <type 'long'>)}, 'collection_url': {'type': (<type 'basestring'>,), 'help': 'Hubstorage Collection URL'}, 'apikey': {'type': (<type 'basestring'>,), 'help': 'Hubstorage API key'}, 'compression': {'default': 'gz', 'type': (<type 'basestring'>,)}, 'items_per_buffer_write': {'default': 500000, 'type': (<type 'int'>, <type 'long'>)}, 'source_path': {'default': None, 'type': (<type 'basestring'>,), 'help': 'Source path, useful for debugging/inspecting tools'}, 'size_per_buffer_write': {'default': 4000000000, 'type': (<type 'int'>, <type 'long'>)}, 'check_consistency': {'default': False, 'type': <type 'bool'>}, 'write_buffer_options': {'default': {}, 'type': <type 'dict'>}}¶
ReduceWriter¶
-
class
exporters.writers.reduce_writer.
ReduceWriter
(*args, **kwargs)[source]¶ Bases:
exporters.writers.base_writer.BaseWriter
This writer allow exporters to make aggregation of items data and print the results
- code (str)
- Python code defining a reduce_function(item, accumulator=None)
-
reduced_result
¶
-
supported_options
= {'write_buffer': {'default': 'exporters.write_buffers.base.WriteBuffer', 'type': (<type 'basestring'>,)}, 'code': {'type': (<type 'basestring'>,), 'help': 'Python code defining a reduce_function(item, accumulator=None)'}, 'compression': {'default': 'gz', 'type': (<type 'basestring'>,)}, 'items_per_buffer_write': {'default': 500000, 'type': (<type 'int'>, <type 'long'>)}, 'source_path': {'default': None, 'type': (<type 'basestring'>,), 'help': 'Source path, useful for debugging/inspecting tools'}, 'size_per_buffer_write': {'default': 4000000000, 'type': (<type 'int'>, <type 'long'>)}, 'items_limit': {'default': 0, 'type': (<type 'int'>, <type 'long'>)}, 'check_consistency': {'default': False, 'type': <type 'bool'>}, 'write_buffer_options': {'default': {}, 'type': <type 'dict'>}}¶
OdoWriter¶
HubstorageWriter¶
-
class
exporters.writers.hubstorage_writer.
HubstorageWriter
(*args, **kwargs)[source]¶ Bases:
exporters.writers.base_writer.BaseWriter
This writer sends items into Scrapinghub Hubstorage collection.
- apikey (str)
- API key with access to the project where the items are being generated.
- project_id (str)
- Id of the project.
- collection_name (str)
- Name of the collection of items.
- key_field (str)
- Record field which should be used as Hubstorage item key
-
supported_options
= {'write_buffer': {'default': 'exporters.write_buffers.base.WriteBuffer', 'type': (<type 'basestring'>,)}, 'apikey': {'type': (<type 'basestring'>,), 'help': 'Hubstorage API key', 'env_fallback': 'EXPORTERS_HS_APIKEY'}, 'compression': {'default': 'gz', 'type': (<type 'basestring'>,)}, 'items_per_buffer_write': {'default': 500000, 'type': (<type 'int'>, <type 'long'>)}, 'collection_name': {'type': (<type 'basestring'>,), 'help': 'Name of the collection of items'}, 'key_field': {'default': '_key', 'type': (<type 'basestring'>,), 'help': 'Record field which should be used as Hubstorage item key'}, 'project_id': {'type': (<type 'basestring'>,), 'help': 'Id of the project'}, 'size_per_buffer_write': {'default': 4000000000, 'type': (<type 'int'>, <type 'long'>)}, 'items_limit': {'default': 0, 'type': (<type 'int'>, <type 'long'>)}, 'check_consistency': {'default': False, 'type': <type 'bool'>}, 'write_buffer_options': {'default': {}, 'type': <type 'dict'>}}¶
Transform¶
You can apply some item transformations as a part of an export job. Using this module, read items can be modified or cleaned before being written. To add a new transform module, you must overwrite the following method:
- transform_batch(batch)
- Receives the batch, transforms its items and yields them,
-
class
exporters.transform.base_transform.
BaseTransform
(options, metadata=None)[source]¶ Bases:
exporters.pipeline.base_pipeline_item.BasePipelineItem
This module receives a batch and writes it where needed. It can implement the following methods:
-
supported_options
= {}¶
-
Provided transform¶
NoTransform¶
-
class
exporters.transform.no_transform.
NoTransform
(*args, **kwargs)[source]¶ Bases:
exporters.transform.base_transform.BaseTransform
It leaves the batch as is. This is provided for the cases where no transformations are needed on the original items.
-
supported_options
= {}¶
-
JqTransform¶
-
class
exporters.transform.jq_transform.
JQTransform
(*args, **kwargs)[source]¶ Bases:
exporters.transform.base_transform.BaseTransform
It applies jq transformations to items. To see documentation about possible jq transformations please refer to its official documentation.
- jq_filter (str)
- Valid jq filter
-
supported_options
= {'jq_filter': {'type': (<type 'basestring'>,)}}¶
PythonexpTransform¶
-
class
exporters.transform.pythonexp_transform.
PythonexpTransform
(*args, **kwargs)[source]¶ Bases:
exporters.transform.base_transform.BaseTransform
It applies python expressions to items.
- python_expression (str)
- Valid python expression
-
supported_options
= {'python_expressions': {'type': <class 'exporters.utils.list[unicode]'>}}¶
PythonmapTransform¶
FlatsonTransform¶
-
class
exporters.transform.flatson_transform.
FlatsonTransform
(*args, **kwargs)[source]¶ Bases:
exporters.transform.base_transform.BaseTransform
It flatten a JSON-like dataset into flat CSV-like tables using the Flatson library, please refer to Flatson official documentation.
- flatson_schema (dict)
- Valid Flatson schema
-
supported_options
= {'flatson_schema': {'type': <type 'dict'>}}¶
Filter¶
This module receives a batch, filters it according to some parameters, and returns it. It must implement the following method:
- filter(item)
- It receives an item and returns True if the item must be included, or False otherwise
-
class
exporters.filters.base_filter.
BaseFilter
(options, metadata)[source]¶ Bases:
exporters.pipeline.base_pipeline_item.BasePipelineItem
This module receives a batch, filter it according to some parameters, and returns it.
-
filter
(item)[source]¶ It receives an item and returns True if the filter must be included, or False if not
-
log_at_every
= 1000¶
-
supported_options
= {}¶
-
Provided filters¶
NoFilter¶
KeyValueFilter¶
KeyValueRegex¶
PythonExpeRegex¶
-
class
exporters.filters.pythonexp_filter.
PythonexpFilter
(*args, **kwargs)[source]¶ Bases:
exporters.filters.base_filter.BaseFilter
Filter items depending on python expression. This is NOT sure, so make sure you only use it in contained environments
- python_expression (str)
- Python expression to filter by
- imports(dict)
- An object with neede imports for expressions
-
supported_options
= {'imports': {'default': {}, 'type': <type 'dict'>}, 'python_expression': {'type': (<type 'basestring'>,)}}¶
Persistence¶
This module is in charge of resuming support. It persists the current state of the read and written items, and inform of that state on demand. It’s usually called from an export manager, and must implement the following methods:
- get_last_position()
- Returns the last commited position
- commit_position(last_position)
- Commits a position that has been through all the pipeline. Position can be any serializable object. This supports both usual position abstractions (number of batch) or specific abstractions such as offsets in Kafka (which are dicts)
- generate_new_job()
- Creates and instantiates all that is needed to keep persistence (tmp files, remote connections...)
- close()
- Cleans tmp files, closes remote connections...
- configuration_from_uri(uri, regex)
- returns a configuration object
It must also define a uri_regex to help the module find a previously created resume abstraction.
-
class
exporters.persistence.base_persistence.
BasePersistence
(options, metadata)[source]¶ Bases:
exporters.pipeline.base_pipeline_item.BasePipelineItem
Base module for persistence modules
-
commit_position
(last_position)[source]¶ Commits a position that has been through all the pipeline. Position can be any serializable object. This support both usual position abstractions (number of batch) of specific abstractions such as offsets in Kafka (which are a dict).
-
generate_new_job
()[source]¶ Creates and instantiates all that is needed to keep persistence (tmp files, remote connections...).
-
supported_options
= {}¶
-
Provided persistence¶
PicklePersistence¶
-
class
exporters.persistence.pickle_persistence.
PicklePersistence
(*args, **kwargs)[source]¶ Bases:
exporters.persistence.base_persistence.BasePersistence
Manages persistence using pickle module loading and dumping as a backend.
- file_path (str)
- Path to store the pickle file
-
supported_options
= {'file_path': {'default': '.', 'type': (<type 'basestring'>,)}}¶
-
uri_regex
= 'pickle:(([a-zA-Z\\d-]|\\/)+)'¶
AlchemyPersistence¶
-
class
exporters.persistence.alchemy_persistence.
BaseAlchemyPersistence
(*args, **kwargs)[source]¶ Bases:
exporters.persistence.base_persistence.BasePersistence
-
PROTOCOL
= None¶
-
classmethod
build_db_conn_uri
(**kwargs)[source]¶ Build the database connection URI from the given keyword arguments
-
classmethod
parse_persistence_uri
(persistence_uri)[source]¶ Parse a database URI and the persistence state ID from the given persistence URI
-
persistence_uri_re
= '(?P<proto>[a-z]+)://(?P<user>.+):(?P<password>.+)@(?P<host>.+):(?P<port>\\d+)/(?P<database>.+)/(?P<job_id>\\d+)'¶
-
supported_options
= {'database': {'type': (<type 'basestring'>,)}, 'host': {'type': (<type 'basestring'>,)}, 'user': {'type': (<type 'basestring'>,)}, 'password': {'type': (<type 'basestring'>,)}, 'port': {'type': (<type 'int'>, <type 'long'>)}}¶
-
-
class
exporters.persistence.alchemy_persistence.
Job
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
configuration
¶
-
id
¶
-
job_finished
¶
-
last_committed
¶
-
last_position
¶
-
-
class
exporters.persistence.alchemy_persistence.
MysqlPersistence
(*args, **kwargs)[source]¶ Bases:
exporters.persistence.alchemy_persistence.BaseAlchemyPersistence
Manage export persistence using a mysql database as a backend. It will add a row for every job in a table called Jobs.
- user (str)
Username with access to mysql database
- password (str)
Password string
- host (str)
DB server host ip
- port (int)
DB server port
- database (str)
Name of the database in which store jobs persistence
-
PROTOCOL
= 'mysql'¶
-
supported_options
= {'database': {'type': (<type 'basestring'>,)}, 'host': {'type': (<type 'basestring'>,)}, 'user': {'type': (<type 'basestring'>,)}, 'password': {'type': (<type 'basestring'>,)}, 'port': {'type': (<type 'int'>, <type 'long'>)}}¶
-
class
exporters.persistence.alchemy_persistence.
PostgresqlPersistence
(*args, **kwargs)[source]¶ Bases:
exporters.persistence.alchemy_persistence.BaseAlchemyPersistence
Manage export persistence using a postgresql database as a backend. It will add a row for every job in a table called Jobs.
- user (str)
Username with access to postgresql database
- password (str)
Password string
- host (str)
DB server host ip
- port (int)
DB server port
- database (str)
Name of the database in which store jobs persistence
-
PROTOCOL
= 'postgresql'¶
-
supported_options
= {'database': {'type': (<type 'basestring'>,)}, 'host': {'type': (<type 'basestring'>,)}, 'user': {'type': (<type 'basestring'>,)}, 'password': {'type': (<type 'basestring'>,)}, 'port': {'type': (<type 'int'>, <type 'long'>)}}¶
-
class
exporters.persistence.alchemy_persistence.
SqlitePersistence
(*args, **kwargs)[source]¶ Bases:
exporters.persistence.alchemy_persistence.BaseAlchemyPersistence
Manage export persistence using a postgresql database as a backend. It will add a row for every job in a table called Jobs.
- user (str)
Username with access to postgresql database
- password (str)
Password string
- host (str)
DB server host ip
- port (int)
DB server port
- database (str)
Name of the database in which store jobs persistence
-
PROTOCOL
= 'postgresql'¶
-
persistence_uri_re
= '(?P<proto>sqlite)://(?P<database>.+):(?P<job_id>\\d+)'¶
-
supported_options
= {'database': {'type': (<type 'basestring'>,)}, 'host': {'default': '', 'type': (<type 'basestring'>,)}, 'user': {'default': '', 'type': (<type 'basestring'>,)}, 'password': {'default': '', 'type': (<type 'basestring'>,)}, 'port': {'default': '', 'type': <type 'int'>}}¶
Notifications¶
You can define notifications for main export job events such as starting an export, ending or failing. These events can be sent to multiple destinations by adding proper modules to an export configuration.
-
class
exporters.notifications.base_notifier.
BaseNotifier
(options, metadata, *args, **kwargs)[source]¶ Bases:
exporters.pipeline.base_pipeline_item.BasePipelineItem
This module takes care of notifications delivery. It has a slightly different architecture than the others due to the support of multiple notification endpoints to be loaded at the same time. As you can see in the provided example, the notifications parameter is an array of notification objects. To extend and add notification endpoints, they can implement the following methods:
-
notify_failed_job
(mgs, stack_trace, receivers=None)[source]¶ Notifies the failure of a dump to the receivers
-
supported_options
= {}¶
-
Provided notifications¶
SESMailNotifier¶
WebhookNotifier¶
-
class
exporters.notifications.webhook_notifier.
WebhookNotifier
(*args, **kwargs)[source]¶ Bases:
exporters.notifications.base_notifier.BaseNotifier
Performs a POST request to provided endpoints
- endpoints (list)
- Endpoints waiting for a start notification
-
supported_options
= {'endpoints': {'default': [], 'type': <class 'exporters.utils.list[unicode]'>}}¶
Grouping¶
This module adds support for grouping items. It must implement the following methods:
- group_batch(batch)
- It adds grouping info to all the items from a batch. Every item, which is a BaseRecord, has a group_membership attribute that should be updated by this method before yielding it
-
class
exporters.groupers.base_grouper.
BaseGrouper
(options, metadata=None)[source]¶ Bases:
exporters.pipeline.base_pipeline_item.BasePipelineItem
Base class fro groupers
-
supported_options
= {}¶
-
Provided Groupers¶
KeyFileGrouper¶
-
class
exporters.groupers.file_key_grouper.
FileKeyGrouper
(*args, **kwargs)[source]¶ Bases:
exporters.groupers.base_grouper.BaseGrouper
Groups items depending on their keys. It adds the group membership information to items.
- keys (list)
- A list of keys to group by
-
supported_options
= {'keys': {'type': <class 'exporters.utils.list[unicode]'>}}¶
NoGrouper¶
PythonExpGrouper¶
-
class
exporters.groupers.python_exp_grouper.
PythonExpGrouper
(*args, **kwargs)[source]¶ Bases:
exporters.groupers.base_grouper.BaseGrouper
Groups items depending on python expressions. It adds the group membership information to items.
- python_expressions (list)
- A list of python expressions to group by
-
supported_options
= {'python_expressions': {'type': <class 'exporters.utils.list[unicode]'>}}¶
Stats Managers¶
This module provides support for keeping track of export statistics. A Stats Manager must implement the following methods:
- iteration_report(times, stats)
- It recieves the times spent in every step of the export pipeline iteration, and the aggregated stats
- final_report(stats)
- Usually called at the end of an export job
Provided Stats Managers¶
BasicStatsManager¶
-
class
exporters.stats_managers.basic_stats_manager.
BasicStatsManager
(*args, **kwargs)[source]¶ Bases:
exporters.stats_managers.base_stats_manager.BaseStatsManager
Module to be used when no stats tracking is needed. It does nothing for iteration reports, and only prints a debug log message with final stats
-
supported_options
= {}¶
-
LoggingStatsManager¶
-
class
exporters.stats_managers.logging_stats_manager.
LoggingStatsManager
(*args, **kwargs)[source]¶ Bases:
exporters.stats_managers.basic_stats_manager.BasicStatsManager
This stats manager prints a log message with useful stats and times for every pipeline iteration.
-
supported_options
= {}¶
-
Export Formatters¶
Exporters use formatter modules to export in different formats. An export formatter must implement the following method:
- format(batch)
- It adds formatting info to all the items from a batch. Every item, which is a BaseRecord, has a formatted attribute that should be updated by this method before yielding it
Provided Export Formatters¶
JsonExportFormatter¶
-
class
exporters.export_formatter.json_export_formatter.
JsonExportFormatter
(*args, **kwargs)[source]¶ Bases:
exporters.export_formatter.base_export_formatter.BaseExportFormatter
This export formatter provides a way of exporting items in JSON format. This one is the default formatter.
- pretty_print(bool)
- If set to True, items will be exported with an ident of 2 and keys sorted, they will exported with a text line otherwise.
-
file_extension
= 'jl'¶
-
supported_options
= {'jsonlines': {'default': True, 'type': <type 'bool'>}, 'pretty_print': {'default': False, 'type': <type 'bool'>}}¶
CSVExportFormatter¶
-
class
exporters.export_formatter.csv_export_formatter.
CSVExportFormatter
(*args, **kwargs)[source]¶ Bases:
exporters.export_formatter.base_export_formatter.BaseExportFormatter
This export formatter provides a way of exporting items in CSV format. This are the supported options:
- show_titles(bool)
- If set to True, first lines of exported files will have a row of column names
- fields(list)
- List of item fields to be exported
- schema(dict)
- Valid json schema of dataset items
- delimiter(str)
- field delimiter character for csv rows
-
file_extension
= 'csv'¶
-
supported_options
= {'fields': {'default': [], 'type': <class 'exporters.utils.list[unicode]'>}, 'show_titles': {'default': True, 'type': <type 'bool'>}, 'delimiter': {'default': ',', 'type': (<type 'basestring'>,)}, 'schema': {'default': {}, 'type': <type 'dict'>}}¶
XMLExportFormatter¶
-
class
exporters.export_formatter.xml_export_formatter.
XMLExportFormatter
(*args, **kwargs)[source]¶ Bases:
exporters.export_formatter.base_export_formatter.BaseExportFormatter
This export formatter provides a way of exporting items in XML format
-
file_extension
= 'xml'¶
-
supported_options
= {'fields_order': {'default': [], 'type': <class 'exporters.utils.list[unicode]'>}, 'item_name': {'default': 'item', 'type': (<type 'basestring'>,)}, 'root_name': {'default': 'root', 'type': (<type 'basestring'>,)}, 'xml_header': {'default': '<?xml version="1.0" encoding="UTF-8"?>', 'type': (<type 'basestring'>,)}, 'attr_type': {'default': True, 'type': <type 'bool'>}}¶
-
Decompressors¶
Decompressors take a compressed stream from a stream reader and return an uncompressed stream. They have to implement one function:
- decompress(stream)
- Decompress the input stream (returns an uncompressed stream)
-
class
exporters.decompressors.
BaseDecompressor
(options, metadata, *args, **kwargs)[source]¶ Bases:
exporters.pipeline.base_pipeline_item.BasePipelineItem
-
supported_options
= {}¶
-
-
class
exporters.decompressors.
ZLibDecompressor
(options, metadata, *args, **kwargs)[source]¶ Bases:
exporters.decompressors.BaseDecompressor
-
supported_options
= {}¶
-
Deserializers¶
Deserializers take a stream of uncompressed bytes and returns an iterator of records They have to implement one function:
- deserialize(stream)
- Deserialize the input stream (return an iterator of records)
-
class
exporters.deserializers.
BaseDeserializer
(options, metadata, *args, **kwargs)[source]¶ Bases:
exporters.pipeline.base_pipeline_item.BasePipelineItem
-
supported_options
= {}¶
-
-
class
exporters.deserializers.
JsonLinesDeserializer
(options, metadata, *args, **kwargs)[source]¶ Bases:
exporters.deserializers.BaseDeserializer
-
supported_options
= {}¶
-