exporters.readers package

Submodules

exporters.readers.base_reader module

class exporters.readers.base_reader.BaseReader(options, metadata)[source]

Bases: exporters.pipeline.base_pipeline_item.BasePipelineItem

This module reads and creates a batch to pass them to the pipeline

close()[source]
get_all_metadata(module='reader')[source]
get_last_position()[source]

Returns the last read position.

get_metadata(key, module='reader')[source]
get_next_batch()[source]

This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.

increase_read()[source]
is_finished()[source]

Returns whether if there are items left to be read or not.

set_last_position(last_position)[source]

Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support

set_metadata(key, value, module='reader')[source]
supported_options = {}
update_metadata(data, module='reader')[source]

exporters.readers.hubstorage_reader module

class exporters.readers.hubstorage_reader.HubstorageReader(*args, **kwargs)[source]

Bases: exporters.readers.base_reader.BaseReader

This reader retrieves items from Scrapinghub Hubstorage collections.

  • batch_size (int)
    Number of items to be returned in each batch
  • apikey (str)
    API key with access to the project where the items are being generated.
  • project_id (int or str)
    Id of the project.
  • collection_name (str)
    Name of the collection of items.
  • count (int)
    Number of records to read from collection.
  • prefixes (list)
    Only include records with given key prefixes.
  • exclude_prefixes (list)
    Exclude records with given key prefixes.
  • secondary_collections (list)
    A list of secondary collections to merge from.
  • startts (int or str)
    Either milliseconds since epoch, or date string.
  • endts (int or str)
    Either milliseconds since epoch, or date string.
get_next_batch()[source]

This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.

set_last_position(last_position)[source]

Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support

supported_options = {'count': {'default': 0, 'type': (<type 'int'>, <type 'long'>)}, 'secondary_collections': {'default': [], 'type': <class 'exporters.utils.list[unicode]'>}, 'apikey': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_HS_APIKEY'}, 'collection_name': {'type': (<type 'basestring'>,)}, 'startts': {'default': None, 'type': (<type 'int'>, <type 'long'>, <type 'basestring'>)}, 'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'prefixes': {'default': [], 'type': <class 'exporters.utils.list[unicode]'>}, 'exclude_prefixes': {'default': [], 'type': <class 'exporters.utils.list[unicode]'>}, 'endts': {'default': None, 'type': (<type 'int'>, <type 'long'>, <type 'basestring'>)}, 'project_id': {'type': (<type 'int'>, <type 'long'>, <type 'basestring'>)}}

exporters.readers.kafka_random_reader module

Kafka random reader

class exporters.readers.kafka_random_reader.KafkaRandomReader(*args, **kwargs)[source]

Bases: exporters.readers.base_reader.BaseReader

This reader retrieves a random subset of items from kafka brokers.

  • record_count (int)
    Number of items to be returned in total
  • batch_size (int)
    Number of items to be returned in each batch
  • brokers (list)
    List of brokers uris.
  • topic (str)
    Topic to read from.
  • group (str)
    Reading group for kafka client.
consume_messages(batchsize)[source]

Get messages batch from the reservoir

create_processor()[source]
decompress_messages(offmsgs)[source]

Decompress pre-defined compressed fields for each message. Msgs should be unpacked before this step.

fill_reservoir()[source]
get_from_kafka(*args, **kw)[source]

Method called to get and process a batch

get_next_batch()[source]

This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.

set_last_position(last_position)[source]

Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support

supported_options = {'record_count': {'type': (<type 'int'>, <type 'long'>)}, 'group': {'type': (<type 'basestring'>,)}, 'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'topic': {'type': (<type 'basestring'>,)}, 'brokers': {'type': <class 'exporters.utils.list[unicode]'>}}
static unpack_messages(msgs)[source]

exporters.readers.kafka_scanner_reader module

Kafka reader

class exporters.readers.kafka_scanner_reader.KafkaScannerReader(*args, **kwargs)[source]

Bases: exporters.readers.base_reader.BaseReader

This reader retrieves items from kafka brokers.

  • batch_size (int)
    Number of items to be returned in each batch
  • brokers (list)
    List of brokers uris.
  • topic (str)
    Topic to read from.
  • partitions (list)
    Partitions to read from.
get_from_kafka(*args, **kw)[source]
get_next_batch()[source]

This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.

set_last_position(last_position)[source]

Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support

supported_options = {'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'topic': {'type': (<type 'basestring'>,)}, 'ssl_configs': {'default': None, 'type': <type 'dict'>}, 'brokers': {'type': <class 'exporters.utils.list[unicode]'>}, 'partitions': {'default': None, 'type': <class 'exporters.utils.list[int]'>}}

exporters.readers.random_reader module

Random items generator, just for testing purposes

class exporters.readers.random_reader.RandomReader(*args, **kwargs)[source]

Bases: exporters.readers.base_reader.BaseReader

It is just a reader with testing purposes. It generates random data in a quantity that is set in its config section.

  • number_of_items (int)
    Number of total items that must be returned by the reader before finishing.
  • batch_size (int)
    Number of items to be returned in each batch.
get_next_batch()[source]

This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.

set_last_position(last_position)[source]

Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support

supported_options = {'batch_size': {'default': 100, 'type': (<type 'int'>, <type 'long'>)}, 'number_of_items': {'default': 1000, 'type': (<type 'int'>, <type 'long'>)}}

exporters.readers.s3_reader module

class exporters.readers.s3_reader.S3BucketKeysFetcher(reader_options, aws_access_key_id, aws_secret_access_key)[source]

Bases: object

pending_keys()[source]
class exporters.readers.s3_reader.S3Reader(*args, **kwargs)[source]

Bases: exporters.readers.base_stream_reader.StreamBasedReader

Reads items from keys located in S3 buckets and compressed with gzip with a common path.

  • bucket (str)

    Name of the bucket to retrieve items from.

  • aws_access_key_id (str)

    Public access key to the s3 bucket.

  • aws_secret_access_key (str)

    Secret access key to the s3 bucket.

  • prefix (str)

    Prefix of s3 keys to be read.

  • prefix_pointer (str)

    Prefix pointing to the last version of dataset. This adds support for regular exports. For example:

    We have a weekly export set with CRON. If we wanted to point to a new data prefix every week, we should keep updating the export configuration. With a pointer, we can set the reader to read from that key, which contains one or several lines with valid prefixes to datasets, so only that pointer file should be updated.

  • pattern (str)

    S3 key name pattern (REGEX). All files that don’t match this regex string will be discarded by the reader.

get_read_streams()[source]
open_stream(stream)[source]
supported_options = {'pattern': {'default': None, 'type': (<type 'basestring'>,)}, 'bucket': {'type': (<type 'basestring'>,)}, 'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'prefix': {'default': '', 'type': (<type 'basestring'>, <type 'list'>)}, 'prefix_format_using_date': {'default': None, 'type': (<type 'basestring'>, <type 'tuple'>, <type 'list'>)}, 'aws_secret_access_key': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_S3READER_AWS_SECRET'}, 'prefix_pointer': {'default': None, 'type': (<type 'basestring'>,)}, 'aws_access_key_id': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_S3READER_AWS_KEY'}}
exporters.readers.s3_reader.format_prefixes(prefixes, start, end)[source]
exporters.readers.s3_reader.get_bucket(bucket, aws_access_key_id, aws_secret_access_key, **kwargs)[source]
exporters.readers.s3_reader.patch_http_response_read(func)[source]

Module contents

class exporters.readers.S3Reader(*args, **kwargs)[source]

Bases: exporters.readers.base_stream_reader.StreamBasedReader

Reads items from keys located in S3 buckets and compressed with gzip with a common path.

  • bucket (str)

    Name of the bucket to retrieve items from.

  • aws_access_key_id (str)

    Public access key to the s3 bucket.

  • aws_secret_access_key (str)

    Secret access key to the s3 bucket.

  • prefix (str)

    Prefix of s3 keys to be read.

  • prefix_pointer (str)

    Prefix pointing to the last version of dataset. This adds support for regular exports. For example:

    We have a weekly export set with CRON. If we wanted to point to a new data prefix every week, we should keep updating the export configuration. With a pointer, we can set the reader to read from that key, which contains one or several lines with valid prefixes to datasets, so only that pointer file should be updated.

  • pattern (str)

    S3 key name pattern (REGEX). All files that don’t match this regex string will be discarded by the reader.

get_read_streams()[source]
open_stream(stream)[source]
supported_options = {'pattern': {'default': None, 'type': (<type 'basestring'>,)}, 'bucket': {'type': (<type 'basestring'>,)}, 'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'prefix': {'default': '', 'type': (<type 'basestring'>, <type 'list'>)}, 'prefix_format_using_date': {'default': None, 'type': (<type 'basestring'>, <type 'tuple'>, <type 'list'>)}, 'aws_secret_access_key': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_S3READER_AWS_SECRET'}, 'prefix_pointer': {'default': None, 'type': (<type 'basestring'>,)}, 'aws_access_key_id': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_S3READER_AWS_KEY'}}
class exporters.readers.RandomReader(*args, **kwargs)[source]

Bases: exporters.readers.base_reader.BaseReader

It is just a reader with testing purposes. It generates random data in a quantity that is set in its config section.

  • number_of_items (int)
    Number of total items that must be returned by the reader before finishing.
  • batch_size (int)
    Number of items to be returned in each batch.
get_next_batch()[source]

This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.

set_last_position(last_position)[source]

Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support

supported_options = {'batch_size': {'default': 100, 'type': (<type 'int'>, <type 'long'>)}, 'number_of_items': {'default': 1000, 'type': (<type 'int'>, <type 'long'>)}}
class exporters.readers.HubstorageReader(*args, **kwargs)[source]

Bases: exporters.readers.base_reader.BaseReader

This reader retrieves items from Scrapinghub Hubstorage collections.

  • batch_size (int)
    Number of items to be returned in each batch
  • apikey (str)
    API key with access to the project where the items are being generated.
  • project_id (int or str)
    Id of the project.
  • collection_name (str)
    Name of the collection of items.
  • count (int)
    Number of records to read from collection.
  • prefixes (list)
    Only include records with given key prefixes.
  • exclude_prefixes (list)
    Exclude records with given key prefixes.
  • secondary_collections (list)
    A list of secondary collections to merge from.
  • startts (int or str)
    Either milliseconds since epoch, or date string.
  • endts (int or str)
    Either milliseconds since epoch, or date string.
get_next_batch()[source]

This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.

set_last_position(last_position)[source]

Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support

supported_options = {'count': {'default': 0, 'type': (<type 'int'>, <type 'long'>)}, 'secondary_collections': {'default': [], 'type': <class 'exporters.utils.list[unicode]'>}, 'apikey': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_HS_APIKEY'}, 'collection_name': {'type': (<type 'basestring'>,)}, 'startts': {'default': None, 'type': (<type 'int'>, <type 'long'>, <type 'basestring'>)}, 'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'prefixes': {'default': [], 'type': <class 'exporters.utils.list[unicode]'>}, 'exclude_prefixes': {'default': [], 'type': <class 'exporters.utils.list[unicode]'>}, 'endts': {'default': None, 'type': (<type 'int'>, <type 'long'>, <type 'basestring'>)}, 'project_id': {'type': (<type 'int'>, <type 'long'>, <type 'basestring'>)}}
class exporters.readers.KafkaScannerReader(*args, **kwargs)[source]

Bases: exporters.readers.base_reader.BaseReader

This reader retrieves items from kafka brokers.

  • batch_size (int)
    Number of items to be returned in each batch
  • brokers (list)
    List of brokers uris.
  • topic (str)
    Topic to read from.
  • partitions (list)
    Partitions to read from.
get_from_kafka(*args, **kw)[source]
get_next_batch()[source]

This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.

set_last_position(last_position)[source]

Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support

supported_options = {'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'topic': {'type': (<type 'basestring'>,)}, 'ssl_configs': {'default': None, 'type': <type 'dict'>}, 'brokers': {'type': <class 'exporters.utils.list[unicode]'>}, 'partitions': {'default': None, 'type': <class 'exporters.utils.list[int]'>}}
class exporters.readers.KafkaRandomReader(*args, **kwargs)[source]

Bases: exporters.readers.base_reader.BaseReader

This reader retrieves a random subset of items from kafka brokers.

  • record_count (int)
    Number of items to be returned in total
  • batch_size (int)
    Number of items to be returned in each batch
  • brokers (list)
    List of brokers uris.
  • topic (str)
    Topic to read from.
  • group (str)
    Reading group for kafka client.
consume_messages(batchsize)[source]

Get messages batch from the reservoir

create_processor()[source]
decompress_messages(offmsgs)[source]

Decompress pre-defined compressed fields for each message. Msgs should be unpacked before this step.

fill_reservoir()[source]
get_from_kafka(*args, **kw)[source]

Method called to get and process a batch

get_next_batch()[source]

This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.

set_last_position(last_position)[source]

Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support

supported_options = {'record_count': {'type': (<type 'int'>, <type 'long'>)}, 'group': {'type': (<type 'basestring'>,)}, 'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'topic': {'type': (<type 'basestring'>,)}, 'brokers': {'type': <class 'exporters.utils.list[unicode]'>}}
static unpack_messages(msgs)[source]
class exporters.readers.FSReader(*args, **kwargs)[source]

Bases: exporters.readers.base_stream_reader.StreamBasedReader

Reads items from files located in filesystem and compressed with gzip with a common path.

  • input (str/dict or a list of str/dict)

    Specification of files to be read.

    Accepts either one “input_unit” or many of them in a list. “input_unit” is defined as follows:

    If a string, it indicates a filename, e.g. “/path/to/filename”.

    If a dictionary, it indicates a directory to be read with the following elements:

    • “dir”: path to directory, e.g. “/path/to/dir”.

    • “dir_pointer”: path to file containing path to directory, e.g. “/path/to/pointer/file” which contains “/path/to/dir”. Cannot be used together with “dir”.

      For example:

      We have a weekly export set with CRON. If we wanted to point to a new data path every week, we should keep updating the export configuration. With a pointer, we can set the reader to read from that file, which contains one line with a valid path to datasets, so only that pointer file should be updated.

    • “pattern”: (optional) regular expression to filter filenames, e.g. “output.*.jl.gz$”

get_read_streams()[source]
open_stream(stream)[source]
supported_options = {'input': {'default': {'dir': ''}, 'type': (<type 'str'>, <type 'dict'>, <type 'list'>)}, 'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}}