|
import logging |
|
import os |
|
import sys |
|
|
|
from copy import deepcopy |
|
from dateutil.relativedelta import relativedelta |
|
|
|
|
|
def _get_logger(path_to_file, console_level=logging.DEBUG, file_level=logging.WARNING): |
|
""" Create a logger object to write in the console and in a file |
|
|
|
:param path_to_file: name of the log file |
|
:param console_level: logging level to write in the console |
|
:param file_level: logging level to write in the file |
|
|
|
:return: logger object |
|
""" |
|
|
|
os.makedirs(os.path.dirname(path_to_file), exist_ok=True) |
|
|
|
|
|
logger = logging.getLogger(path_to_file) |
|
logger.setLevel(logging.DEBUG) |
|
|
|
|
|
console_handler = logging.StreamHandler(sys.stdout) |
|
console_handler.setLevel(console_level) |
|
formatter = logging.Formatter('%(message)s', datefmt='%m/%d/%Y %I:%M:%S') |
|
console_handler.setFormatter(formatter) |
|
logger.addHandler(console_handler) |
|
|
|
|
|
f_handler = logging.FileHandler(path_to_file) |
|
f_handler.setLevel(file_level) |
|
formatter_f = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%m/%d/%Y %I:%M:%S') |
|
f_handler.setFormatter(formatter_f) |
|
logger.addHandler(f_handler) |
|
|
|
return logger |
|
|
|
|
|
def _get_root_logger(console_level=logging.DEBUG): |
|
""" Create a logger object to write in the console |
|
|
|
:param console_level: logging level to write in the console |
|
|
|
:return: logger object |
|
""" |
|
|
|
logger = logging.getLogger() |
|
logger.setLevel(logging.DEBUG) |
|
|
|
|
|
console_handler = logging.StreamHandler(sys.stdout) |
|
|
|
|
|
already_exist = False |
|
for l in logger.handlers: |
|
if type(console_handler) == type(l): |
|
already_exist = True |
|
|
|
if not already_exist: |
|
console_handler.setLevel(console_level) |
|
formatter = logging.Formatter('(root_logger) [%(levelname)s] %(message)s', datefmt='%m/%d/%Y %I:%M:%S') |
|
console_handler.setFormatter(formatter) |
|
logger.addHandler(console_handler) |
|
|
|
return logger |
|
|
|
|
|
def _format_logging_level_arg(log_level): |
|
""" Convert logging level arg to its corresponding int logging level |
|
|
|
:param log_level: arg referring to the desired logging level |
|
|
|
:return: int corresponding to the desired logging level |
|
""" |
|
if not isinstance(log_level, str): |
|
if log_level not in [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR]: |
|
print('LOGGING LEVEL NOT RECOGNIZED -- Setting it to DEBUG') |
|
return logging.DEBUG |
|
else: |
|
return log_level |
|
|
|
else: |
|
if log_level.lower().strip() == 'debug': |
|
return logging.DEBUG |
|
elif log_level.lower().strip() == 'info': |
|
return logging.INFO |
|
elif log_level.lower().strip() in ['warn', 'warning']: |
|
return logging.WARNING |
|
elif log_level.lower().strip() == 'error': |
|
return logging.ERROR |
|
else: |
|
print('LOGGING LEVEL NOT RECOGNIZED -- Setting it to DEBUG') |
|
return logging.DEBUG |
|
|
|
|
|
def get_logger_from_arg(logger=None, console_level=logging.DEBUG, file_level=logging.WARNING): |
|
""" Create logger instance to display information |
|
|
|
:param logger: either None, string or logger instance |
|
can also be a dict of keywords: {'logger': .., console_level: .., file_level: ..} |
|
:param console_level: (string or int) logging level to write in the console |
|
:param file_level: (string or int) logging level to write in a file |
|
|
|
:return: logger object |
|
""" |
|
|
|
if isinstance(logger, dict): |
|
logger = get_logger_from_arg(**logger) |
|
|
|
|
|
elif not isinstance(logger, type(logging.getLogger())) and not isinstance(logger, type(logging.getLogger('dummy'))): |
|
|
|
console_level = _format_logging_level_arg(console_level) |
|
file_level = _format_logging_level_arg(file_level) |
|
|
|
|
|
if logger == '' or logger is None: |
|
logger = ConsolePrintLogger(console_level) |
|
|
|
|
|
elif isinstance(logger, str): |
|
logger = _get_logger(logger, console_level, file_level) |
|
|
|
|
|
elif isinstance(logger, FakeLogger) or isinstance(logger, ConsolePrintLogger): |
|
pass |
|
|
|
|
|
else: |
|
logger = ConsolePrintLogger() |
|
|
|
return logger |
|
|
|
|
|
def get_args_from_logger(logger): |
|
""" Retrieve args that were given to create logger object |
|
|
|
:param logger: logger object that was created using get_logger_from_arg() |
|
|
|
:return: dictionary -- {'logger': .., 'console_level': .., 'file_level': ..} |
|
""" |
|
|
|
kwargs = {'logger': None, 'console_level': logging.DEBUG, 'file_level': logging.WARNING} |
|
|
|
|
|
for handler in logger.handlers: |
|
if isinstance(handler, logging.FileHandler): |
|
kwargs['file_level'] = handler.level |
|
kwargs['logger'] = handler.baseFilename |
|
|
|
elif isinstance(handler, logging.StreamHandler): |
|
kwargs['console_level'] = handler.level |
|
|
|
return kwargs |
|
|
|
|
|
def prog_bar(i, n, bar_size=16): |
|
""" Create a progress bar to estimate remaining time |
|
|
|
:param i: current iteration |
|
:param n: total number of iterations |
|
:param bar_size: size of the bar |
|
|
|
:return: a visualisation of the progress bar |
|
""" |
|
bar = '' |
|
done = (i * bar_size) // n |
|
|
|
for j in range(bar_size): |
|
bar += '█' if j <= done else '░' |
|
|
|
message = f'{bar} {i}/{n}' |
|
return message |
|
|
|
|
|
def estimate_required_time(nb_items_in_list, current_index, time_elapsed, interval=100): |
|
""" Compute a remaining time estimation to process all items contained in a list |
|
|
|
:param nb_items_in_list: all list items that have to be processed |
|
:param current_index: current list index, contained in [0, nb_items_in_list - 1] |
|
:param time_elapsed: time elapsed to process current_index items in the list |
|
:param interval: estimate remaining time when (current_index % interval) == 0 |
|
|
|
:return: time elapsed since the last time estimation |
|
""" |
|
current_index += 1 |
|
if current_index % interval == 0 or current_index == nb_items_in_list: |
|
|
|
seconds = (nb_items_in_list - current_index) * (time_elapsed / current_index) |
|
time_estimation = relativedelta(seconds=int(seconds)) |
|
time_estimation_string = f'{time_estimation.hours:02}:{time_estimation.minutes:02}:{time_estimation.seconds:02}' |
|
|
|
|
|
progress_bar = prog_bar(i=current_index, n=nb_items_in_list) |
|
|
|
|
|
if current_index == nb_items_in_list: |
|
sys.stdout.write(f'\r{progress_bar} -- estimated required time = {time_estimation_string} -- Finished!') |
|
else: |
|
sys.stdout.write(f'\r{progress_bar} -- estimated required time = {time_estimation_string}') |
|
|
|
|
|
def simple_table(item_tuples, logger=None): |
|
""" Display tuple items in a table |
|
|
|
:param item_tuples: items to display. Each tuple item is composed of two components (heading, cell) |
|
:param logger: arg to create logger object |
|
""" |
|
|
|
logger = get_logger_from_arg(logger) |
|
|
|
|
|
border_pattern = '+---------------------------------------' |
|
whitespace = ' ' |
|
|
|
|
|
headings, cells, = [], [] |
|
for item in item_tuples: |
|
|
|
heading, cell = str(item[0]), str(item[1]) |
|
|
|
|
|
pad_head = True if len(heading) < len(cell) else False |
|
pad = abs(len(heading) - len(cell)) |
|
pad = whitespace[:pad] |
|
pad_left = pad[:len(pad)//2] |
|
pad_right = pad[len(pad)//2:] |
|
|
|
if pad_head: |
|
heading = pad_left + heading + pad_right |
|
else: |
|
cell = pad_left + cell + pad_right |
|
|
|
headings += [heading] |
|
cells += [cell] |
|
|
|
|
|
border, head, body = '', '', '' |
|
for i in range(len(item_tuples)): |
|
temp_head = f'| {headings[i]} ' |
|
temp_body = f'| {cells[i]} ' |
|
|
|
border += border_pattern[:len(temp_head)] |
|
head += temp_head |
|
body += temp_body |
|
|
|
if i == len(item_tuples) - 1: |
|
head += '|' |
|
body += '|' |
|
border += '+' |
|
|
|
|
|
logger.info(border) |
|
logger.info(head) |
|
logger.info(border) |
|
logger.info(body) |
|
logger.info(border) |
|
logger.info(' ') |
|
|
|
|
|
def get_all_handler_parameters_from_logger(logger): |
|
""" Extract handler parameters from a logger object |
|
|
|
:param logger: logger object |
|
|
|
:return: list of handler parameters contained in logger object |
|
""" |
|
|
|
handler_list = list() |
|
|
|
|
|
for h in logger.handlers: |
|
if isinstance(h, logging.FileHandler): |
|
handler_list.append({'type': type(h), |
|
'level': h.level, |
|
'file_name': h.baseFilename, |
|
'format': deepcopy(h.formatter), |
|
'object': None}) |
|
else: |
|
handler_list.append({'type': type(h), |
|
'level': h.level, |
|
'format':deepcopy(h.formatter), |
|
'object': None}) |
|
|
|
return handler_list |
|
|
|
|
|
class FakeLogger: |
|
""" |
|
FakeLogger is used in multi-processed functions. It replaces a normal logger object. |
|
It packages message and send them to a queue that a listener will read and write in a proper logger object. |
|
""" |
|
|
|
queue = None |
|
propagate = False |
|
level = 0 |
|
|
|
def __init__(self, queue): |
|
self.queue = queue |
|
|
|
def send_fake_message_on_queue(self, level, msg): |
|
self.queue.put((level, msg)) |
|
|
|
def send_warning_of_fake_logger(self, msg): |
|
msg = f'Logging when using LaFAT multiprocess facilities disables some logger functionality such as: {msg}' |
|
self.send_fake_message_on_queue(level=logging.WARNING, msg=msg) |
|
|
|
def critical(self, message, *args, **kwargs): |
|
self.send_fake_message_on_queue(level=logging.CRITICAL, msg=message) |
|
|
|
def error(self, message, *args, **kwargs): |
|
self.send_fake_message_on_queue(level=logging.ERROR, msg=message) |
|
|
|
def warn(self, message, *args, **kwargs): |
|
self.send_fake_message_on_queue(level=logging.WARNING, msg=message) |
|
|
|
def warning(self, message, *args, **kwargs): |
|
self.send_fake_message_on_queue(level=logging.WARNING, msg=message) |
|
|
|
def info(self, message, *args, **kwargs): |
|
self.send_fake_message_on_queue(level=logging.INFO, msg=message) |
|
|
|
def debug(self, message, *args, **kwargs): |
|
self.send_fake_message_on_queue(level=logging.DEBUG, msg=message) |
|
|
|
def log(self, message, *args, **kwargs): |
|
self.send_fake_message_on_queue(level=self.level, msg=message) |
|
|
|
def exception(self, message, *args, **kwargs): |
|
self.send_fake_message_on_queue(level=logging.ERROR, msg=message) |
|
|
|
def handle(self, record): |
|
self.queue.put(record) |
|
|
|
def setLevel(self, level): |
|
self.level = level |
|
|
|
def isEnabledFor(self, level): |
|
self.send_warning_of_fake_logger(f'isEnabledFor({level})') |
|
return True |
|
|
|
def getEffectiveLevel(self): |
|
self.send_warning_of_fake_logger(f'getEffectiveLevel()') |
|
return 0 |
|
|
|
def getChild(self, *args, **kwargs): |
|
self.send_warning_of_fake_logger(f'getChild()') |
|
return None |
|
|
|
def addFilter(self, *args, **kwargs): |
|
self.send_warning_of_fake_logger(f'addFilter()') |
|
|
|
def removeFilter(self, *args, **kwargs): |
|
self.send_warning_of_fake_logger(f'removeFilter()') |
|
|
|
def filter(self, *args, **kwargs): |
|
self.send_warning_of_fake_logger(f'filter()') |
|
|
|
def addHandler(self, *args, **kwargs): |
|
self.send_warning_of_fake_logger(f'addHandler()') |
|
|
|
def removeHandler(self, *args, **kwargs): |
|
self.send_warning_of_fake_logger(f'removeHandler()') |
|
|
|
def findCaller(self, *args, **kwargs): |
|
self.send_warning_of_fake_logger(f'findCaller()') |
|
|
|
def makeRecord(self, *args, **kwargs): |
|
self.send_warning_of_fake_logger(f'makeRecord()') |
|
|
|
def hasHandlers(self): |
|
return False |
|
|
|
|
|
class ConsolePrintLogger: |
|
""" |
|
FakeLogger is used in multi-processed functions. It replaces a normal logger object. |
|
It packages message and send them to a queue that a listener will read and write in a proper logger object. |
|
""" |
|
|
|
level = 0 |
|
|
|
def __init__(self, level=0): |
|
self.level = level |
|
|
|
def critical(self, message, *args, **kwargs): |
|
if self.level <= logging.CRITICAL: |
|
print(f"[CRITICAL]: {message}") |
|
|
|
def error(self, message, *args, **kwargs): |
|
if self.level <= logging.ERROR: |
|
print(f"[ERROR]: {message}") |
|
|
|
def warn(self, message, *args, **kwargs): |
|
if self.level <= logging.WARNING: |
|
print(f"[WARNING]: {message}") |
|
|
|
def warning(self, message, *args, **kwargs): |
|
if self.level <= logging.WARNING: |
|
print(f"[WARNING]: {message}") |
|
|
|
def info(self, message, *args, **kwargs): |
|
if self.level <= logging.INFO: |
|
print(f"[INFO]: {message}") |
|
|
|
def debug(self, message, *args, **kwargs): |
|
if self.level <= logging.DEBUG: |
|
print(f"[DEBUG]: {message}") |
|
|
|
def log(self, message, *args, **kwargs): |
|
print(f"[LOG]: {message}") |
|
|
|
def exception(self, message, *args, **kwargs): |
|
print(f"[EXCEPTION]: {message}") |
|
|
|
def handle(self, record): |
|
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED HANDLES") |
|
|
|
def setLevel(self, level): |
|
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED LEVELS") |
|
|
|
def isEnabledFor(self, level): |
|
return True |
|
|
|
def getEffectiveLevel(self): |
|
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED LEVELS") |
|
return 0 |
|
|
|
def getChild(self, *args, **kwargs): |
|
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE CHILDS") |
|
return None |
|
|
|
def addFilter(self, *args, **kwargs): |
|
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED FILTERS") |
|
|
|
def removeFilter(self, *args, **kwargs): |
|
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED FILTERS") |
|
|
|
def filter(self, *args, **kwargs): |
|
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED FILTERS") |
|
|
|
def addHandler(self, *args, **kwargs): |
|
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED HANDLERS") |
|
|
|
def removeHandler(self, *args, **kwargs): |
|
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED HANDLERS") |
|
|
|
def findCaller(self, *args, **kwargs): |
|
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED CALLERS") |
|
|
|
def makeRecord(self, *args, **kwargs): |
|
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED RECORDS") |
|
|
|
def hasHandlers(self): |
|
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED HANDLERS") |
|
|