File size: 2,869 Bytes
2abfccb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
import logging
from petrel_client.ceph.ceph import Ceph
from petrel_client.common.io_profile import profile
from petrel_client.common.exception import ObjectNotFoundError
from petrel_client.ceph.librgw import rgw
LOG = logging.getLogger(__name__)
class RGWClient(Ceph):
kb = 1024
mb = 1024 * kb
def __init__(self, cluster, conf, *args, **kwargs):
LOG.debug('init RGWClient(%s)', cluster)
super(RGWClient, self).__init__(cluster, conf, *args, **kwargs)
conn_args = {
'conf': conf['conf'],
'keyring': conf['keyring'],
'name': conf['name'],
'cluster': conf['cluster'],
}
uid = conf.get('uid', 'user_id')
self.bucket_fs = {}
self._init_librgw(uid, conf['access_key'],
conf['secret_key'], conn_args)
def _init_librgw(self, uid=None, key=None, secret=None, connection_kwargs=None):
try:
self.client = rgw.LibRGWFS(uid, key, secret, **connection_kwargs)
self.root_fs = self.client.mount()
LOG.debug("The connection bulid successfully.")
except Exception as e:
LOG.error("The input parameters is invalid. %s", e)
raise Exception("The input parameters is invalid.", e)
@profile('get')
def get(self, cluster, bucket, key, file_size=4 * mb, **kwargs):
try:
# destination_base_uri = S3Uri(filename)
# bucket = destination_base_uri.bucket()
# key = destination_base_uri.object()
bucket_fs = self.bucket_fs.get(bucket)
if not bucket_fs:
bucket_fs = self.client.opendir(self.root_fs, bucket)
self.bucket_fs[bucket] = bucket_fs
file_fs = self.client.open(bucket_fs, key)
value = self.client.read(file_fs, 0, file_size)
self.client.close(file_fs)
self.client.close(bucket_fs)
# log.debug('filename is: {}'.format(key))
# log.debug('value size is: {} kB'.format(len(value) / self.kb))
return value
except rgw.ObjectNotFound as err:
raise ObjectNotFoundError(err)
def put(self, cluster, bucket, key, body, **kwargs):
# destination_base_uri = S3Uri(filename)
# bucket = destination_base_uri.bucket()
# key = destination_base_uri.object()
bucket_fs = self.client.opendir(self.root_fs, bucket)
try:
file_fs = self.client.create(bucket_fs, key)
except rgw.ObjectExists:
file_fs = self.client.open(bucket_fs, key)
self.client.write(file_fs, 0, body)
self.client.close(file_fs)
self.client.close(bucket_fs)
# log.debug('filename is: {}'.format(key))
# log.debug('value size is: {} kB'.format(len(body) / self.kb))
return True
|