File size: 4,300 Bytes
2abfccb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import functools
from urllib.parse import urlparse
import logging
import hashlib

from petrel_client.common.io_profile import profile
from petrel_client.ceph.s3cpp.pys3client import PyS3Client, S3Error, init_api, shutdown_api
from petrel_client.ceph.ceph import Ceph
from petrel_client.common import exception

LOG = logging.getLogger(__name__)

EXCEPTION_MAP = {
    'ACCESS_DENIED': exception.AccessDeniedError,
    'NO_SUCH_BUCKET': exception.NoSuchBucketError,
    'NO_SUCH_KEY': exception.NoSuchKeyError,
    'RESOURCE_NOT_FOUND': exception.ResourceNotFoundError,
    'SIGNATURE_DOES_NOT_MATCH': exception.SignatureNotMatchError,
    'INVALID_ACCESS_KEY_ID': exception.InvalidAccessKeyError,
    'NETWORK_CONNECTION': exception.NetworkConnectionError,
}

S3_CPP_ENV = None


class S3CppEnv(object):
    def __init__(self, log_level):
        LOG.debug('S3CppEnv init')
        init_api(log_level)

    def __del__(self):
        # LOG.debug('S3CppEnv del') del 阶段log抛异常
        shutdown_api()


def get_s3_cpp_env(log_level):
    global S3_CPP_ENV
    if S3_CPP_ENV is None:
        S3_CPP_ENV = S3CppEnv(log_level)
    return S3_CPP_ENV


def wrap_error(fn):
    @functools.wraps(fn)
    def new_fn(self, cluster, bucket, key, *args, **kwargs):
        try:
            return fn(self, cluster, bucket, key, *args, **kwargs)
        except S3Error as err:
            err_type = EXCEPTION_MAP.get(err.error_name, None)
            if err_type:
                new_err = err_type(cluster, bucket, key)
            elif err.error_message:
                new_err = exception.S3ClientError(
                    err.error_name, err.error_message)
            else:
                new_err = exception.S3ClientError(err.error_name)

            new_err.__traceback__ = err.__traceback__
            raise new_err from None

    return new_fn


class S3CppClient(Ceph):
    def __init__(self, cluster, conf, anonymous_access, *args, **kwargs):
        # 如果初始化出现异常,将会调用 __del__ ,这里先赋值避免 __del__ 出现逻辑错误
        self._client = None
        self._env = None

        endpoint_url = conf['endpoint_url']
        if '://' not in endpoint_url:
            endpoint_url = 'http://' + endpoint_url
        parse_result = urlparse(endpoint_url)
        s3_args = {
            # AWS CPP SDK 中 ak 和 sk 为空时表示匿名访问
            'ak': b'' if anonymous_access else conf['access_key'].encode('utf-8'),
            'sk': b'' if anonymous_access else conf['secret_key'].encode('utf-8'),

            'endpoint': parse_result.netloc.encode('utf-8'),
            'enable_https': parse_result.scheme == 'https',
            'verify_ssl': conf.get_boolean('verify_ssl', False),
            'use_dual_stack': False,
        }

        super(S3CppClient, self).__init__(cluster, conf, *args, **kwargs)
        self._cluster = cluster
        self._conf = conf

        s3_cpp_log_level = conf.get('s3_cpp_log_level')
        self._env = get_s3_cpp_env(s3_cpp_log_level)
        self._client = PyS3Client(**s3_args)

    def __del__(self):
        del self._client
        del self._env

    @profile('get')
    @wrap_error
    def get_with_info(self, cluster, bucket, key, **kwargs):
        info = {}

        unsupported_ops = [k for k, v in kwargs.items() if k in (
            'enable_stream', 'enable_etag') and v]
        if unsupported_ops:
            raise NotImplementedError(unsupported_ops)

        if isinstance(bucket, str):
            bucket = bucket.encode('utf-8')
        if isinstance(key, str):
            key = key.encode('utf-8')
        data = self._client.get_object(bucket, key)

        enable_md5 = kwargs.get('enable_md5', False)
        if enable_md5:
            info['md5'] = hashlib.md5(data).hexdigest()

        return data, info

    @profile('put')
    @wrap_error
    def put_with_info(self, cluster, bucket, key, body, **kwargs):
        info = {}  # todo
        if not isinstance(body, (bytes, bytearray)):
            raise NotImplementedError(f'unsupported type f{type(body)}')
        if isinstance(bucket, str):
            bucket = bucket.encode('utf-8')
        if isinstance(key, str):
            key = key.encode('utf-8')
        return self._client.put_object(bucket, key, body), info