Source code for exporters.writers.s3_writer

import os
from collections import Counter
from contextlib import closing, contextmanager
import six
from exporters.default_retries import retry_long
from exporters.progress_callback import BotoDownloadProgress
from exporters.utils import CHUNK_SIZE, split_file, calculate_multipart_etag, get_bucket_name, \
                            get_boto_connection
from exporters.writers.base_writer import InconsistentWriteState
from exporters.writers.filebase_base_writer import FilebaseBaseWriter


DEFAULT_BUCKET_REGION = 'us-east-1'


@contextmanager
[docs]def multipart_upload(bucket, key_name, **kwargs): mp = bucket.initiate_multipart_upload(key_name, **kwargs) try: yield mp mp.complete_upload() except: mp.cancel_upload() raise
[docs]def should_use_multipart_upload(path, bucket): from boto.exception import S3ResponseError # We need to check if we have READ permissions on this bucket, as they are # needed to perform the complete_upload operation. try: acl = bucket.get_acl() for grant in acl.acl.grants: if grant.permission == 'READ': break except S3ResponseError: return False return os.path.getsize(path) > CHUNK_SIZE
[docs]class S3Writer(FilebaseBaseWriter): """ Writes items to S3 bucket. It is a File Based writer, so it has filebase option available - bucket (str) Name of the bucket to write the items to. - aws_access_key_id (str) Public acces key to the s3 bucket. - aws_secret_access_key (str) Secret access key to the s3 bucket. - filebase (str) Base path to store the items in the bucket. - aws_region (str) AWS region to connect to. - save_metadata (bool) Save key's items count as metadata. Default: True - filebase Path to store the exported files """ supported_options = { 'bucket': {'type': six.string_types}, 'aws_access_key_id': { 'type': six.string_types, 'env_fallback': 'EXPORTERS_S3WRITER_AWS_LOGIN' }, 'aws_secret_access_key': { 'type': six.string_types, 'env_fallback': 'EXPORTERS_S3WRITER_AWS_SECRET' }, 'aws_region': {'type': six.string_types, 'default': None}, 'host': {'type': six.string_types, 'default': None}, 'save_pointer': {'type': six.string_types, 'default': None}, 'save_metadata': {'type': bool, 'default': True, 'required': False} } def __init__(self, options, *args, **kwargs): super(S3Writer, self).__init__(options, *args, **kwargs) access_key = self.read_option('aws_access_key_id') secret_key = self.read_option('aws_secret_access_key') self.aws_region = self.read_option('aws_region') self.host = self.read_option('host') bucket_name = get_bucket_name(self.read_option('bucket')) self.logger.info('Starting S3Writer for bucket: %s' % bucket_name) if self.aws_region is None: self.aws_region = self._get_bucket_location(access_key, secret_key, bucket_name) self.conn = get_boto_connection(access_key, secret_key, self.aws_region, bucket_name, self.host) self.bucket = self.conn.get_bucket(bucket_name, validate=False) self.save_metadata = self.read_option('save_metadata') self.set_metadata('files_counter', Counter()) self.set_metadata('keys_written', []) def _get_bucket_location(self, access_key, secret_key, bucket): try: conn = get_boto_connection(access_key, secret_key, bucketname=bucket, host=self.host) return conn.get_bucket(bucket).get_location() or DEFAULT_BUCKET_REGION except: return DEFAULT_BUCKET_REGION def _update_metadata(self, dump_path, key_name): buffer_info = self.write_buffer.get_metadata(dump_path) key_info = { 'key_name': key_name, 'size': buffer_info['size'], 'number_of_records': buffer_info['number_of_records'] } keys_written = self.get_metadata('keys_written') keys_written.append(key_info) self.set_metadata('keys_written', keys_written) def _get_total_count(self, dump_path): return self.write_buffer.get_metadata_for_file(dump_path, 'number_of_records') or 0 def _ensure_proper_key_permissions(self, key): from boto.exception import S3ResponseError try: key.set_acl('bucket-owner-full-control') except S3ResponseError: self.logger.warning('We have no READ_ACP/WRITE_ACP permissions') def _create_key_metadata(self, dump_path, md5=None): from boto.utils import compute_md5 metadata = {} metadata['total'] = self._get_total_count(dump_path) if md5: metadata['md5'] = md5 else: with open(dump_path, 'r') as f: metadata['md5'] = compute_md5(f) return metadata def _save_metadata_for_key(self, key, dump_path, md5=None): from boto.exception import S3ResponseError metadata = self._create_key_metadata(dump_path, md5) try: for k, v in metadata.items(): key.set_metadata(k, v) except S3ResponseError: self.logger.warning( 'We have no READ_ACP/WRITE_ACP permissions, ' 'so we could not add metadata info') def _upload_small_file(self, dump_path, key_name): with closing(self.bucket.new_key(key_name)) as key, open(dump_path, 'r') as f: buffer_info = self.write_buffer.get_metadata(dump_path) md5 = key.get_md5_from_hexdigest(buffer_info['file_hash']) if self.save_metadata: self._save_metadata_for_key(key, dump_path, md5) progress = BotoDownloadProgress(self.logger) key.set_contents_from_file(f, cb=progress, md5=md5) self._ensure_proper_key_permissions(key) @retry_long def _upload_chunk(self, mp, chunk): mp.upload_part_from_file(chunk.bytes, part_num=chunk.number) def _upload_large_file(self, dump_path, key_name): self.logger.debug('Using multipart S3 uploader') md5 = None if self.save_metadata: md5 = calculate_multipart_etag(dump_path, CHUNK_SIZE) metadata = self._create_key_metadata(dump_path, md5=md5) with multipart_upload(self.bucket, key_name, metadata=metadata) as mp: for chunk in split_file(dump_path): self._upload_chunk(mp, chunk) self.logger.debug( 'Uploaded chunk number {}'.format(chunk.number)) def _write_s3_key(self, dump_path, key_name): destination = 's3://{}/{}'.format(self.bucket.name, key_name) self.logger.info('Start uploading {} to {}'.format(dump_path, destination)) if should_use_multipart_upload(dump_path, self.bucket): self._upload_large_file(dump_path, key_name) else: self._upload_small_file(dump_path, key_name) self.last_written_file = destination self.logger.info('Saved {}'.format(destination))
[docs] def write(self, dump_path, group_key=None, file_name=None): if group_key is None: group_key = [] filebase_path, file_name = self.create_filebase_name(group_key, file_name=file_name) key_name = filebase_path + '/' + file_name self._write_s3_key(dump_path, key_name) self._update_metadata(dump_path, key_name) self.get_metadata('files_counter')[filebase_path] += 1
@retry_long def _write_s3_pointer(self, save_pointer, filebase): with closing(self.bucket.new_key(save_pointer)) as key: key.set_contents_from_string(filebase) def _update_last_pointer(self): save_pointer = self.read_option('save_pointer') self._write_s3_pointer(save_pointer, self.filebase.dirname_template + '/')
[docs] def close(self): """ Called to clean all possible tmp files created during the process. """ if self.read_option('save_pointer'): self._update_last_pointer() super(S3Writer, self).close()
[docs] def get_file_suffix(self, path, prefix): number_of_keys = self.get_metadata('files_counter').get(path, 0) suffix = '{}'.format(str(number_of_keys)) return suffix
def _check_write_consistency(self): from boto.exception import S3ResponseError for key_info in self.get_metadata('keys_written'): try: key = self.bucket.get_key(key_info['key_name']) if not key: raise InconsistentWriteState('Key {} not found in bucket'.format( key_info['key_name'])) if str(key.content_length) != str(key_info['size']): raise InconsistentWriteState( 'Key {} has unexpected size. (expected {} - got {})'.format( key_info['key_name'], key_info['size'], key.content_length)) if self.save_metadata: if str(key.get_metadata('total')) != str(key_info['number_of_records']): raise InconsistentWriteState( 'Unexpected number of records for key {}. (' 'expected {} - got {})'.format(key_info['key_name'], key_info['number_of_records'], key.get_metadata('total'))) except S3ResponseError: self.logger.warning( 'Skipping consistency check for key {}. Probably due to lack of ' 'read permissions'.format(key_info['key_name'])) self.logger.info('Consistency check passed')