exporters.readers package¶
Submodules¶
exporters.readers.base_reader module¶
-
class
exporters.readers.base_reader.
BaseReader
(options, metadata)[source]¶ Bases:
exporters.pipeline.base_pipeline_item.BasePipelineItem
This module reads and creates a batch to pass them to the pipeline
-
get_next_batch
()[source]¶ This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.
-
set_last_position
(last_position)[source]¶ Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support
-
supported_options
= {}¶
-
exporters.readers.hubstorage_reader module¶
-
class
exporters.readers.hubstorage_reader.
HubstorageReader
(*args, **kwargs)[source]¶ Bases:
exporters.readers.base_reader.BaseReader
This reader retrieves items from Scrapinghub Hubstorage collections.
- batch_size (int)
- Number of items to be returned in each batch
- apikey (str)
- API key with access to the project where the items are being generated.
- project_id (int or str)
- Id of the project.
- collection_name (str)
- Name of the collection of items.
- count (int)
- Number of records to read from collection.
- prefixes (list)
- Only include records with given key prefixes.
- exclude_prefixes (list)
- Exclude records with given key prefixes.
- secondary_collections (list)
- A list of secondary collections to merge from.
- startts (int or str)
- Either milliseconds since epoch, or date string.
- endts (int or str)
- Either milliseconds since epoch, or date string.
-
get_next_batch
()[source]¶ This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.
-
set_last_position
(last_position)[source]¶ Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support
-
supported_options
= {'count': {'default': 0, 'type': (<type 'int'>, <type 'long'>)}, 'secondary_collections': {'default': [], 'type': <class 'exporters.utils.list[unicode]'>}, 'apikey': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_HS_APIKEY'}, 'collection_name': {'type': (<type 'basestring'>,)}, 'startts': {'default': None, 'type': (<type 'int'>, <type 'long'>, <type 'basestring'>)}, 'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'prefixes': {'default': [], 'type': <class 'exporters.utils.list[unicode]'>}, 'exclude_prefixes': {'default': [], 'type': <class 'exporters.utils.list[unicode]'>}, 'endts': {'default': None, 'type': (<type 'int'>, <type 'long'>, <type 'basestring'>)}, 'project_id': {'type': (<type 'int'>, <type 'long'>, <type 'basestring'>)}}¶
exporters.readers.kafka_random_reader module¶
Kafka random reader
-
class
exporters.readers.kafka_random_reader.
KafkaRandomReader
(*args, **kwargs)[source]¶ Bases:
exporters.readers.base_reader.BaseReader
This reader retrieves a random subset of items from kafka brokers.
- record_count (int)
- Number of items to be returned in total
- batch_size (int)
- Number of items to be returned in each batch
- brokers (list)
- List of brokers uris.
- topic (str)
- Topic to read from.
- group (str)
- Reading group for kafka client.
-
decompress_messages
(offmsgs)[source]¶ Decompress pre-defined compressed fields for each message. Msgs should be unpacked before this step.
-
get_next_batch
()[source]¶ This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.
-
set_last_position
(last_position)[source]¶ Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support
-
supported_options
= {'record_count': {'type': (<type 'int'>, <type 'long'>)}, 'group': {'type': (<type 'basestring'>,)}, 'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'topic': {'type': (<type 'basestring'>,)}, 'brokers': {'type': <class 'exporters.utils.list[unicode]'>}}¶
exporters.readers.kafka_scanner_reader module¶
Kafka reader
-
class
exporters.readers.kafka_scanner_reader.
KafkaScannerReader
(*args, **kwargs)[source]¶ Bases:
exporters.readers.base_reader.BaseReader
This reader retrieves items from kafka brokers.
- batch_size (int)
- Number of items to be returned in each batch
- brokers (list)
- List of brokers uris.
- topic (str)
- Topic to read from.
- partitions (list)
- Partitions to read from.
-
get_next_batch
()[source]¶ This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.
-
set_last_position
(last_position)[source]¶ Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support
-
supported_options
= {'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'topic': {'type': (<type 'basestring'>,)}, 'ssl_configs': {'default': None, 'type': <type 'dict'>}, 'brokers': {'type': <class 'exporters.utils.list[unicode]'>}, 'partitions': {'default': None, 'type': <class 'exporters.utils.list[int]'>}}¶
exporters.readers.random_reader module¶
Random items generator, just for testing purposes
-
class
exporters.readers.random_reader.
RandomReader
(*args, **kwargs)[source]¶ Bases:
exporters.readers.base_reader.BaseReader
It is just a reader with testing purposes. It generates random data in a quantity that is set in its config section.
- number_of_items (int)
- Number of total items that must be returned by the reader before finishing.
- batch_size (int)
- Number of items to be returned in each batch.
-
get_next_batch
()[source]¶ This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.
-
set_last_position
(last_position)[source]¶ Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support
-
supported_options
= {'batch_size': {'default': 100, 'type': (<type 'int'>, <type 'long'>)}, 'number_of_items': {'default': 1000, 'type': (<type 'int'>, <type 'long'>)}}¶
exporters.readers.s3_reader module¶
-
class
exporters.readers.s3_reader.
S3BucketKeysFetcher
(reader_options, aws_access_key_id, aws_secret_access_key)[source]¶ Bases:
object
-
class
exporters.readers.s3_reader.
S3Reader
(*args, **kwargs)[source]¶ Bases:
exporters.readers.base_stream_reader.StreamBasedReader
Reads items from keys located in S3 buckets and compressed with gzip with a common path.
- bucket (str)
Name of the bucket to retrieve items from.
- aws_access_key_id (str)
Public access key to the s3 bucket.
- aws_secret_access_key (str)
Secret access key to the s3 bucket.
- prefix (str)
Prefix of s3 keys to be read.
- prefix_pointer (str)
Prefix pointing to the last version of dataset. This adds support for regular exports. For example:
We have a weekly export set with CRON. If we wanted to point to a new data prefix every week, we should keep updating the export configuration. With a pointer, we can set the reader to read from that key, which contains one or several lines with valid prefixes to datasets, so only that pointer file should be updated.
- pattern (str)
S3 key name pattern (REGEX). All files that don’t match this regex string will be discarded by the reader.
-
supported_options
= {'pattern': {'default': None, 'type': (<type 'basestring'>,)}, 'bucket': {'type': (<type 'basestring'>,)}, 'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'prefix': {'default': '', 'type': (<type 'basestring'>, <type 'list'>)}, 'prefix_format_using_date': {'default': None, 'type': (<type 'basestring'>, <type 'tuple'>, <type 'list'>)}, 'aws_secret_access_key': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_S3READER_AWS_SECRET'}, 'prefix_pointer': {'default': None, 'type': (<type 'basestring'>,)}, 'aws_access_key_id': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_S3READER_AWS_KEY'}}¶
Module contents¶
-
class
exporters.readers.
S3Reader
(*args, **kwargs)[source]¶ Bases:
exporters.readers.base_stream_reader.StreamBasedReader
Reads items from keys located in S3 buckets and compressed with gzip with a common path.
- bucket (str)
Name of the bucket to retrieve items from.
- aws_access_key_id (str)
Public access key to the s3 bucket.
- aws_secret_access_key (str)
Secret access key to the s3 bucket.
- prefix (str)
Prefix of s3 keys to be read.
- prefix_pointer (str)
Prefix pointing to the last version of dataset. This adds support for regular exports. For example:
We have a weekly export set with CRON. If we wanted to point to a new data prefix every week, we should keep updating the export configuration. With a pointer, we can set the reader to read from that key, which contains one or several lines with valid prefixes to datasets, so only that pointer file should be updated.
- pattern (str)
S3 key name pattern (REGEX). All files that don’t match this regex string will be discarded by the reader.
-
supported_options
= {'pattern': {'default': None, 'type': (<type 'basestring'>,)}, 'bucket': {'type': (<type 'basestring'>,)}, 'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'prefix': {'default': '', 'type': (<type 'basestring'>, <type 'list'>)}, 'prefix_format_using_date': {'default': None, 'type': (<type 'basestring'>, <type 'tuple'>, <type 'list'>)}, 'aws_secret_access_key': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_S3READER_AWS_SECRET'}, 'prefix_pointer': {'default': None, 'type': (<type 'basestring'>,)}, 'aws_access_key_id': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_S3READER_AWS_KEY'}}¶
-
class
exporters.readers.
RandomReader
(*args, **kwargs)[source]¶ Bases:
exporters.readers.base_reader.BaseReader
It is just a reader with testing purposes. It generates random data in a quantity that is set in its config section.
- number_of_items (int)
- Number of total items that must be returned by the reader before finishing.
- batch_size (int)
- Number of items to be returned in each batch.
-
get_next_batch
()[source]¶ This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.
-
set_last_position
(last_position)[source]¶ Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support
-
supported_options
= {'batch_size': {'default': 100, 'type': (<type 'int'>, <type 'long'>)}, 'number_of_items': {'default': 1000, 'type': (<type 'int'>, <type 'long'>)}}¶
-
class
exporters.readers.
HubstorageReader
(*args, **kwargs)[source]¶ Bases:
exporters.readers.base_reader.BaseReader
This reader retrieves items from Scrapinghub Hubstorage collections.
- batch_size (int)
- Number of items to be returned in each batch
- apikey (str)
- API key with access to the project where the items are being generated.
- project_id (int or str)
- Id of the project.
- collection_name (str)
- Name of the collection of items.
- count (int)
- Number of records to read from collection.
- prefixes (list)
- Only include records with given key prefixes.
- exclude_prefixes (list)
- Exclude records with given key prefixes.
- secondary_collections (list)
- A list of secondary collections to merge from.
- startts (int or str)
- Either milliseconds since epoch, or date string.
- endts (int or str)
- Either milliseconds since epoch, or date string.
-
get_next_batch
()[source]¶ This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.
-
set_last_position
(last_position)[source]¶ Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support
-
supported_options
= {'count': {'default': 0, 'type': (<type 'int'>, <type 'long'>)}, 'secondary_collections': {'default': [], 'type': <class 'exporters.utils.list[unicode]'>}, 'apikey': {'type': (<type 'basestring'>,), 'env_fallback': 'EXPORTERS_HS_APIKEY'}, 'collection_name': {'type': (<type 'basestring'>,)}, 'startts': {'default': None, 'type': (<type 'int'>, <type 'long'>, <type 'basestring'>)}, 'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'prefixes': {'default': [], 'type': <class 'exporters.utils.list[unicode]'>}, 'exclude_prefixes': {'default': [], 'type': <class 'exporters.utils.list[unicode]'>}, 'endts': {'default': None, 'type': (<type 'int'>, <type 'long'>, <type 'basestring'>)}, 'project_id': {'type': (<type 'int'>, <type 'long'>, <type 'basestring'>)}}¶
-
class
exporters.readers.
KafkaScannerReader
(*args, **kwargs)[source]¶ Bases:
exporters.readers.base_reader.BaseReader
This reader retrieves items from kafka brokers.
- batch_size (int)
- Number of items to be returned in each batch
- brokers (list)
- List of brokers uris.
- topic (str)
- Topic to read from.
- partitions (list)
- Partitions to read from.
-
get_next_batch
()[source]¶ This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.
-
set_last_position
(last_position)[source]¶ Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support
-
supported_options
= {'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'topic': {'type': (<type 'basestring'>,)}, 'ssl_configs': {'default': None, 'type': <type 'dict'>}, 'brokers': {'type': <class 'exporters.utils.list[unicode]'>}, 'partitions': {'default': None, 'type': <class 'exporters.utils.list[int]'>}}¶
-
class
exporters.readers.
KafkaRandomReader
(*args, **kwargs)[source]¶ Bases:
exporters.readers.base_reader.BaseReader
This reader retrieves a random subset of items from kafka brokers.
- record_count (int)
- Number of items to be returned in total
- batch_size (int)
- Number of items to be returned in each batch
- brokers (list)
- List of brokers uris.
- topic (str)
- Topic to read from.
- group (str)
- Reading group for kafka client.
-
decompress_messages
(offmsgs)[source]¶ Decompress pre-defined compressed fields for each message. Msgs should be unpacked before this step.
-
get_next_batch
()[source]¶ This method is called from the manager. It must return a list or a generator of BaseRecord objects. When it has nothing else to read, it must set class variable “finished” to True.
-
set_last_position
(last_position)[source]¶ Called from the manager, it is in charge of updating the last position of data commited by the writer, in order to have resume support
-
supported_options
= {'record_count': {'type': (<type 'int'>, <type 'long'>)}, 'group': {'type': (<type 'basestring'>,)}, 'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}, 'topic': {'type': (<type 'basestring'>,)}, 'brokers': {'type': <class 'exporters.utils.list[unicode]'>}}¶
-
class
exporters.readers.
FSReader
(*args, **kwargs)[source]¶ Bases:
exporters.readers.base_stream_reader.StreamBasedReader
Reads items from files located in filesystem and compressed with gzip with a common path.
- input (str/dict or a list of str/dict)
Specification of files to be read.
Accepts either one “input_unit” or many of them in a list. “input_unit” is defined as follows:
If a string, it indicates a filename, e.g. “/path/to/filename”.
If a dictionary, it indicates a directory to be read with the following elements:
“dir”: path to directory, e.g. “/path/to/dir”.
“dir_pointer”: path to file containing path to directory, e.g. “/path/to/pointer/file” which contains “/path/to/dir”. Cannot be used together with “dir”.
For example:
We have a weekly export set with CRON. If we wanted to point to a new data path every week, we should keep updating the export configuration. With a pointer, we can set the reader to read from that file, which contains one line with a valid path to datasets, so only that pointer file should be updated.
“pattern”: (optional) regular expression to filter filenames, e.g. “output.*.jl.gz$”
-
supported_options
= {'input': {'default': {'dir': ''}, 'type': (<type 'str'>, <type 'dict'>, <type 'list'>)}, 'batch_size': {'default': 10000, 'type': (<type 'int'>, <type 'long'>)}}¶