zjowowen's picture
init space
079c32c
raw
history blame
4.85 kB
import json
import socket
import time
from typing import Optional, Any, Mapping, Callable, Type, Tuple
import requests
from requests import HTTPError
from urlobject import URLObject
from urlobject.path import URLPath
from .common import translate_dict_func
def get_host_ip() -> Optional[str]:
s = None
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
if s is not None:
s.close()
return ip
_DEFAULT_HTTP_PORT = 80
_DEFAULT_HTTPS_PORT = 443
def split_http_address(address: str, default_port: Optional[int] = None) -> Tuple[str, int, bool, str]:
_url = URLObject(address)
_host = _url.hostname
_https = (_url.scheme.lower()) == 'https'
_port = _url.port or default_port or (_DEFAULT_HTTPS_PORT if _https else _DEFAULT_HTTP_PORT)
_path = str(_url.path) or ''
return _host, _port, _https, _path
DEFAULT_RETRIES = 5
DEFAULT_RETRY_WAITING = 1.0
class HttpEngine:
def __init__(self, host: str, port: int, https: bool = False, path: str = None):
self.__base_url = URLObject().with_scheme('https' if https else 'http') \
.with_hostname(host).with_port(port).add_path(path or '')
self.__session = requests.session()
self.__session.trust_env = False
# noinspection PyMethodMayBeStatic
def _data_process(self, data: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
return data or {}
# noinspection PyMethodMayBeStatic
def _base_headers(self) -> Mapping[str, None]:
return {}
def _error_handler(self, err: Exception):
raise err
def get_url(self, path: str = None):
original_segments = self.__base_url.path.segments
path_segments = URLPath().add(path or '').segments
return str(self.__base_url.with_path(URLPath.join_segments(original_segments + path_segments)))
def __single_request(
self,
method: str,
path: str,
data: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, Any]] = None,
params: Optional[Mapping[str, Any]] = None,
raise_for_status: bool = True
):
response = self.__session.request(
method=method,
url=self.get_url(path),
data=json.dumps(self._data_process(data) or {}),
headers=headers,
params=params or {},
)
if raise_for_status:
response.raise_for_status()
return response
def request(
self,
method: str,
path: str,
data: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, Any]] = None,
params: Optional[Mapping[str, Any]] = None,
raise_for_status: bool = True,
retries: Optional[int] = None,
retry_waiting: Optional[float] = None,
) -> requests.Response:
_headers = dict(self._base_headers())
_headers.update(headers or {})
retries = retries or DEFAULT_RETRIES
retry_waiting = retry_waiting or DEFAULT_RETRY_WAITING
try:
_current_retries = 0
while True:
try:
response = self.__single_request(method, path, data, _headers, params, raise_for_status)
except requests.exceptions.HTTPError as err:
raise err
except requests.exceptions.RequestException as err:
_current_retries += 1
if _current_retries > retries:
raise err
else:
time.sleep(retry_waiting)
else:
break
except Exception as e:
self._error_handler(e)
else:
return response
def get_http_engine_class(
headers: Mapping[str, Callable[..., Any]],
data_processor: Optional[Callable[[Mapping[str, Any]], Mapping[str, Any]]] = None,
http_error_gene: Optional[Callable[[HTTPError], Exception]] = None,
) -> Callable[..., Type[HttpEngine]]:
def _func(*args, **kwargs) -> Type[HttpEngine]:
class _HttpEngine(HttpEngine):
def _data_process(self, data: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
return (data_processor or (lambda d: d or {}))(data or {})
def _base_headers(self) -> Mapping[str, None]:
return translate_dict_func(headers)(*args, **kwargs)
def _error_handler(self, err: Exception):
if http_error_gene is not None and isinstance(err, HTTPError):
raise http_error_gene(err)
else:
raise err
return _HttpEngine
return _func