Source code for exporters.writers.gstorage_writer

import json
import os
import pkg_resources
import six

from base64 import b64encode
from binascii import unhexlify

from exporters.default_retries import retry_long
from exporters.writers.filebase_base_writer import FilebaseBaseWriter
from exporters.utils import TemporaryDirectory
from exporters.writers.base_writer import InconsistentWriteState


[docs]class GStorageWriter(FilebaseBaseWriter): """ Writes items to Google Storage buckets. It is a File Based writer, so it has filebase option available - filebase (str) Path to store the exported files - project (str) Valid project name - bucket (str) Google Storage bucket name - credentials (str or dict) Object with valid Google credentials, could be set using env variable EXPORTERS_GSTORAGE_CREDS_RESOURCE which should include reference to credentials JSON file installed with setuptools. This reference should have form "package_name:file_path" """ supported_options = { 'project': {'type': six.string_types}, 'bucket': {'type': six.string_types}, 'credentials': { 'type': (dict,) + six.string_types, 'env_fallback': 'EXPORTERS_GSTORAGE_CREDS_RESOURCE' } } def __init__(self, options, *args, **kwargs): from gcloud import storage super(GStorageWriter, self).__init__(options, *args, **kwargs) project = self.read_option('project') bucket_name = self.read_option('bucket') with TemporaryDirectory() as temp_dir: credentials_file = os.path.join(temp_dir, 'credentials.json') with open(credentials_file, 'w') as f: creds_opt = self.read_option('credentials') if isinstance(creds_opt, six.string_types): package_name, path = creds_opt.split(':') try: serialized = pkg_resources.resource_string(package_name, path) except: self.write_buffer.close() raise else: serialized = json.dumps(creds_opt) f.write(serialized) client = storage.Client.from_service_account_json(credentials_file, project=project) self.bucket = client.bucket(bucket_name) self.set_metadata('files_written', []) def _blob_url(self, bucket_name, blob_name): return 'https://storage.cloud.google.com/{}/{}'.format(bucket_name, blob_name) @retry_long def _write_gstorage_blob(self, dump_path, blob_name): destination = self._blob_url(self.bucket.name, blob_name) self.logger.info('Start uploading {} to {}'.format(dump_path, destination)) with open(dump_path, 'r') as f: blob = self.bucket.blob(blob_name) blob.upload_from_file(f) self._update_metadata(dump_path, blob) self.logger.info('Saved {}'.format(destination))
[docs] def write(self, dump_path, group_key=None, file_name=None): if group_key is None: group_key = [] filebase_path, file_name = self.create_filebase_name(group_key, file_name=file_name) blob_name = filebase_path + '/' + file_name self._write_gstorage_blob(dump_path, blob_name) self.last_written_file = blob_name
[docs] def write_stream(self, stream, file_obj): filebase_path, file_name = self.create_filebase_name([], file_name=stream.filename) blob_name = filebase_path + '/' + file_name blob = self.bucket.blob(blob_name) blob.upload_from_file(file_obj, size=stream.size)
def _update_metadata(self, dump_path, blob): buffer_info = self.write_buffer.get_metadata(dump_path) key_info = { 'size': buffer_info['size'], 'remote_size': blob.size, 'hash': b64encode(unhexlify(buffer_info['file_hash'])), 'remote_hash': blob.md5_hash, 'title': blob.name, } self.get_metadata('files_written').append(key_info) def _check_write_consistency(self): for file_info in self.get_metadata('files_written'): if file_info['size'] != file_info['remote_size']: msg = 'Unexpected size of file {title}. Expected {size} - got {remote_size}' raise InconsistentWriteState(msg.format(**file_info)) if file_info['hash'] != file_info['remote_hash']: msg = 'Unexpected hash of file {title}. Expected {hash} - got {remote_hash}' raise InconsistentWriteState(msg.format(**file_info))