Source code for exporters.writers.azure_blob_writer

import re
import warnings
import six
from base64 import b64encode
from binascii import unhexlify
from exporters.default_retries import retry_long
from exporters.writers.base_writer import BaseWriter, InconsistentWriteState


[docs]class AzureBlobWriter(BaseWriter): """ Writes items to azure blob containers. - account_name (str) Public acces name of the azure account. - account_key (str) Public acces key to the azure account. - container (str) Blob container name. """ supported_options = { 'account_name': {'type': six.string_types, 'env_fallback': 'EXPORTERS_AZUREWRITER_NAME'}, 'account_key': {'type': six.string_types, 'env_fallback': 'EXPORTERS_AZUREWRITER_KEY'}, 'container': {'type': six.string_types} } hash_algorithm = 'md5' VALID_CONTAINER_NAME_RE = r'[a-zA-Z0-9-]{3,63}' def __init__(self, *args, **kw): from azure.storage.blob import BlockBlobService super(AzureBlobWriter, self).__init__(*args, **kw) account_name = self.read_option('account_name') account_key = self.read_option('account_key') self.container = self.read_option('container') if '--' in self.container or not re.match(self.VALID_CONTAINER_NAME_RE, self.container): help_url = ('https://azure.microsoft.com/en-us/documentation' '/articles/storage-python-how-to-use-blob-storage/') warnings.warn("Container name %s doesn't conform with naming rules (see: %s)" % (self.container, help_url)) self.azure_service = BlockBlobService(account_name, account_key) self.azure_service.create_container(self.container) self.logger.info('AzureBlobWriter has been initiated.' 'Writing to container {}'.format(self.container)) self.set_metadata('files_counter', 0) self.set_metadata('blobs_written', [])
[docs] def write(self, dump_path, group_key=None): self.logger.info('Start uploading {} to {}'.format(dump_path, self.container)) self._write_blob(dump_path) self.set_metadata('files_counter', self.get_metadata('files_counter') + 1)
@retry_long def _write_blob(self, dump_path): blob_name = dump_path.split('/')[-1] self.azure_service.create_blob_from_path( self.read_option('container'), blob_name, dump_path, max_connections=5, ) self.logger.info('Saved {}'.format(blob_name)) self._update_metadata(dump_path, blob_name) def _update_metadata(self, dump_path, blob_name): buffer_info = self.write_buffer.get_metadata(dump_path) file_info = { 'blob_name': blob_name, 'size': buffer_info['size'], 'hash': b64encode(unhexlify(buffer_info['file_hash'])), 'number_of_records': buffer_info['number_of_records'] } self.get_metadata('blobs_written').append(file_info) def _check_write_consistency(self): from azure.common import AzureMissingResourceHttpError for blob_info in self.get_metadata('blobs_written'): try: blob = self.azure_service.get_blob_properties( self.read_option('container'), blob_info['blob_name']) blob_size = blob.properties.content_length blob_md5 = blob.properties.content_settings.content_md5 if str(blob_size) != str(blob_info['size']): raise InconsistentWriteState( 'File {} has unexpected size. (expected {} - got {})'.format( blob_info['blob_name'], blob_info['size'], blob_size ) ) if str(blob_md5) != str(blob_info['hash']): raise InconsistentWriteState( 'File {} has unexpected hash. (expected {} - got {})'.format( blob_info['blob_name'], blob_info['hash'], blob_md5 ) ) except AzureMissingResourceHttpError: raise InconsistentWriteState('Missing blob {}'.format(blob_info['blob_name'])) self.logger.info('Consistency check passed')