|
|
|
|
|
|
|
import re |
|
from petrel_client.common.exception import InvalidClusterNameError, InvalidS3UriError, NoDefaultClusterNameError |
|
from petrel_client.client_base import ClientBase |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_S3_URI_PATTERN = re.compile( |
|
r'^(?:(?P<cluster>[^:]+):)?s3://(?P<bucket>[^/]+)/?(?P<key>(?:.+?)/?$)?', re.I) |
|
|
|
|
|
class Ceph(ClientBase): |
|
|
|
@staticmethod |
|
def parse_uri(uri, ceph_dict, default_cluster=None): |
|
m = _S3_URI_PATTERN.match(uri) |
|
if m: |
|
cluster, bucket, key = m.group( |
|
'cluster'), m.group('bucket'), m.group('key') |
|
cluster = cluster or default_cluster |
|
if not cluster: |
|
raise NoDefaultClusterNameError(uri) |
|
|
|
try: |
|
client = ceph_dict[cluster] |
|
enable_cache = client.enable_cache() |
|
return cluster, bucket, key, enable_cache |
|
except KeyError: |
|
raise InvalidClusterNameError(cluster) |
|
else: |
|
raise InvalidS3UriError(uri) |
|
|
|
@staticmethod |
|
def create(cluster, conf, *args, **kwargs): |
|
fake = conf.get_boolean('fake') |
|
enable_s3_cpp = conf.get('boto').lower() in ('cpp', 'c++') |
|
enable_boto = (not enable_s3_cpp) and conf.get_boolean('boto') |
|
anonymous_access = (conf.get('access_key', None) is None) and ( |
|
conf.get('secret_key', None) is None) |
|
|
|
if fake: |
|
from petrel_client.fake_client import FakeClient |
|
name = f'S3: {cluster}' |
|
return FakeClient(client_type='s3', conf=conf, name=name, **kwargs) |
|
elif enable_s3_cpp: |
|
from petrel_client.ceph.s3cpp.s3_cpp_client import S3CppClient |
|
return S3CppClient(cluster, conf, anonymous_access, *args, **kwargs) |
|
elif enable_boto: |
|
from petrel_client.ceph.s3.s3_client import S3Client |
|
return S3Client(cluster, conf, anonymous_access, *args, **kwargs) |
|
else: |
|
from petrel_client.ceph.librgw.rgw_client import RGWClient |
|
return RGWClient(cluster, conf, *args, **kwargs) |
|
|
|
def __init__(self, cluster, conf, *args, **kwargs): |
|
super(Ceph, self).__init__(*args, name=cluster, conf=conf, **kwargs) |
|
self.__enable_cache = conf.get_boolean( |
|
'enable_mc', False) or conf.get_boolean('enable_cache', False) |
|
|
|
def enable_cache(self): |
|
|
|
|
|
|
|
return self.__enable_cache |
|
|