File size: 4,846 Bytes
079c32c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
import json
import socket
import time
from typing import Optional, Any, Mapping, Callable, Type, Tuple
import requests
from requests import HTTPError
from urlobject import URLObject
from urlobject.path import URLPath
from .common import translate_dict_func
def get_host_ip() -> Optional[str]:
s = None
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
if s is not None:
s.close()
return ip
_DEFAULT_HTTP_PORT = 80
_DEFAULT_HTTPS_PORT = 443
def split_http_address(address: str, default_port: Optional[int] = None) -> Tuple[str, int, bool, str]:
_url = URLObject(address)
_host = _url.hostname
_https = (_url.scheme.lower()) == 'https'
_port = _url.port or default_port or (_DEFAULT_HTTPS_PORT if _https else _DEFAULT_HTTP_PORT)
_path = str(_url.path) or ''
return _host, _port, _https, _path
DEFAULT_RETRIES = 5
DEFAULT_RETRY_WAITING = 1.0
class HttpEngine:
def __init__(self, host: str, port: int, https: bool = False, path: str = None):
self.__base_url = URLObject().with_scheme('https' if https else 'http') \
.with_hostname(host).with_port(port).add_path(path or '')
self.__session = requests.session()
self.__session.trust_env = False
# noinspection PyMethodMayBeStatic
def _data_process(self, data: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
return data or {}
# noinspection PyMethodMayBeStatic
def _base_headers(self) -> Mapping[str, None]:
return {}
def _error_handler(self, err: Exception):
raise err
def get_url(self, path: str = None):
original_segments = self.__base_url.path.segments
path_segments = URLPath().add(path or '').segments
return str(self.__base_url.with_path(URLPath.join_segments(original_segments + path_segments)))
def __single_request(
self,
method: str,
path: str,
data: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, Any]] = None,
params: Optional[Mapping[str, Any]] = None,
raise_for_status: bool = True
):
response = self.__session.request(
method=method,
url=self.get_url(path),
data=json.dumps(self._data_process(data) or {}),
headers=headers,
params=params or {},
)
if raise_for_status:
response.raise_for_status()
return response
def request(
self,
method: str,
path: str,
data: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, Any]] = None,
params: Optional[Mapping[str, Any]] = None,
raise_for_status: bool = True,
retries: Optional[int] = None,
retry_waiting: Optional[float] = None,
) -> requests.Response:
_headers = dict(self._base_headers())
_headers.update(headers or {})
retries = retries or DEFAULT_RETRIES
retry_waiting = retry_waiting or DEFAULT_RETRY_WAITING
try:
_current_retries = 0
while True:
try:
response = self.__single_request(method, path, data, _headers, params, raise_for_status)
except requests.exceptions.HTTPError as err:
raise err
except requests.exceptions.RequestException as err:
_current_retries += 1
if _current_retries > retries:
raise err
else:
time.sleep(retry_waiting)
else:
break
except Exception as e:
self._error_handler(e)
else:
return response
def get_http_engine_class(
headers: Mapping[str, Callable[..., Any]],
data_processor: Optional[Callable[[Mapping[str, Any]], Mapping[str, Any]]] = None,
http_error_gene: Optional[Callable[[HTTPError], Exception]] = None,
) -> Callable[..., Type[HttpEngine]]:
def _func(*args, **kwargs) -> Type[HttpEngine]:
class _HttpEngine(HttpEngine):
def _data_process(self, data: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
return (data_processor or (lambda d: d or {}))(data or {})
def _base_headers(self) -> Mapping[str, None]:
return translate_dict_func(headers)(*args, **kwargs)
def _error_handler(self, err: Exception):
if http_error_gene is not None and isinstance(err, HTTPError):
raise http_error_gene(err)
else:
raise err
return _HttpEngine
return _func
|