Source code for exporters.writers.cloudsearch_writer

import gzip
import json
import six

import requests
from exporters.default_retries import retry_short, retry_long

from exporters.writers.base_writer import BaseWriter


CLOUDSEARCH_MAX_BATCH_SIZE = 5 * 1000 * 1000


[docs]def create_document_batches(jsonlines, id_field, max_batch_size=CLOUDSEARCH_MAX_BATCH_SIZE): """Create batches in expected AWS Cloudsearch format, limiting the byte size per batch according to given max_batch_size See: http://docs.aws.amazon.com/cloudsearch/latest/developerguide/preparing-data.html """ batch = [] fixed_initial_size = 2 def create_entry(line): try: record = json.loads(line) except: raise ValueError('Could not parse JSON from: %s' % line) key = record[id_field] return '{"type":"add","id":%s,"fields":%s}' % (json.dumps(key), line) current_size = fixed_initial_size for line in jsonlines: entry = create_entry(line) entry_size = len(entry) + 1 if max_batch_size > (current_size + entry_size): current_size += entry_size batch.append(entry) else: yield '[' + ','.join(batch) + ']' batch = [entry] current_size = fixed_initial_size + entry_size if batch: yield '[' + ','.join(batch) + ']'
[docs]class CloudSearchWriter(BaseWriter): """ This writer stores items in CloudSearch Amazon Web Services service (https://aws.amazon.com/es/cloudsearch/) - endpoint_url Document Endpoint (e.g.: http://doc-movies-123456789012.us-east-1.cloudsearch.amazonaws.com) - id_field Field to use as identifier - access_key Public acces key to the s3 bucket. - secret_key Secret access key to the s3 bucket. """ supported_options = { 'endpoint_url': { 'type': six.string_types, 'help': 'Document Endpoint' ' (e.g.: http://doc-movies-123456789012.us-east-1.cloudsearch.amazonaws.com)' }, 'id_field': { 'type': six.string_types, 'help': 'Field to use as identifier', 'default': '_key', }, 'access_key': { 'type': six.string_types, 'env_fallback': 'EXPORTERS_CLOUDSEARCH_ACCESS_KEY', 'default': None, }, 'secret_key': { 'type': six.string_types, 'env_fallback': 'EXPORTERS_CLOUDSEARCH_SECRET_KEY', 'default': None, }, } def __init__(self, options, *args, **kwargs): super(CloudSearchWriter, self).__init__(options, *args, **kwargs) self.endpoint_url = self.read_option('endpoint_url').rstrip('/') self.access_key = self.read_option('access_key') self.secret_key = self.read_option('secret_key') self.id_field = self.read_option('id_field') @retry_short def _post_document_batch(self, batch): """ Send a batch to Cloudsearch endpoint See: http://docs.aws.amazon.com/cloudsearch/latest/developerguide/submitting-doc-requests.html """ # noqa target_batch = '/2013-01-01/documents/batch' url = self.endpoint_url + target_batch return requests.post(url, data=batch, headers={'Content-type': 'application/json'}) @retry_long
[docs] def write(self, dump_path, group_key=None): with gzip.open(dump_path) as f: for batch in create_document_batches(iter(f), self.id_field): self.logger.info('Starting upload batch with size %d' % len(batch)) self._post_document_batch(batch) self.logger.info('Uploaded batch with size %d' % len(batch))