youngseng's picture
Upload 187 files
da855ff
import logging
import os
import sys
from copy import deepcopy
from dateutil.relativedelta import relativedelta
def _get_logger(path_to_file, console_level=logging.DEBUG, file_level=logging.WARNING):
""" Create a logger object to write in the console and in a file
:param path_to_file: name of the log file
:param console_level: logging level to write in the console
:param file_level: logging level to write in the file
:return: logger object
"""
# create folder for log file
os.makedirs(os.path.dirname(path_to_file), exist_ok=True)
# create logger
logger = logging.getLogger(path_to_file)
logger.setLevel(logging.DEBUG)
# make it display in the console
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(console_level)
formatter = logging.Formatter('%(message)s', datefmt='%m/%d/%Y %I:%M:%S')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# make it write on file
f_handler = logging.FileHandler(path_to_file)
f_handler.setLevel(file_level)
formatter_f = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%m/%d/%Y %I:%M:%S')
f_handler.setFormatter(formatter_f)
logger.addHandler(f_handler)
return logger
def _get_root_logger(console_level=logging.DEBUG):
""" Create a logger object to write in the console
:param console_level: logging level to write in the console
:return: logger object
"""
# create logger object
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# make it display in the console
console_handler = logging.StreamHandler(sys.stdout)
# check if there is already one stream handler
already_exist = False
for l in logger.handlers:
if type(console_handler) == type(l):
already_exist = True
if not already_exist:
console_handler.setLevel(console_level)
formatter = logging.Formatter('(root_logger) [%(levelname)s] %(message)s', datefmt='%m/%d/%Y %I:%M:%S')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
def _format_logging_level_arg(log_level):
""" Convert logging level arg to its corresponding int logging level
:param log_level: arg referring to the desired logging level
:return: int corresponding to the desired logging level
"""
if not isinstance(log_level, str):
if log_level not in [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR]:
print('LOGGING LEVEL NOT RECOGNIZED -- Setting it to DEBUG')
return logging.DEBUG
else:
return log_level
else:
if log_level.lower().strip() == 'debug':
return logging.DEBUG
elif log_level.lower().strip() == 'info':
return logging.INFO
elif log_level.lower().strip() in ['warn', 'warning']:
return logging.WARNING
elif log_level.lower().strip() == 'error':
return logging.ERROR
else:
print('LOGGING LEVEL NOT RECOGNIZED -- Setting it to DEBUG')
return logging.DEBUG
def get_logger_from_arg(logger=None, console_level=logging.DEBUG, file_level=logging.WARNING):
""" Create logger instance to display information
:param logger: either None, string or logger instance
can also be a dict of keywords: {'logger': .., console_level: .., file_level: ..}
:param console_level: (string or int) logging level to write in the console
:param file_level: (string or int) logging level to write in a file
:return: logger object
"""
# check if keyword arguments were passed into a dictionary
if isinstance(logger, dict):
logger = get_logger_from_arg(**logger)
# directly check if logger already exists
elif not isinstance(logger, type(logging.getLogger())) and not isinstance(logger, type(logging.getLogger('dummy'))):
# convert arguments to logging.level (ints)
console_level = _format_logging_level_arg(console_level)
file_level = _format_logging_level_arg(file_level)
# if the logger is an empty string, return the console print logger
if logger == '' or logger is None:
logger = ConsolePrintLogger(console_level)
# if logger is a string, it is assumed to be the path of the logger object
elif isinstance(logger, str):
logger = _get_logger(logger, console_level, file_level)
# if it is a fake logger continue
elif isinstance(logger, FakeLogger) or isinstance(logger, ConsolePrintLogger):
pass
# default logger
else:
logger = ConsolePrintLogger()
return logger
def get_args_from_logger(logger):
""" Retrieve args that were given to create logger object
:param logger: logger object that was created using get_logger_from_arg()
:return: dictionary -- {'logger': .., 'console_level': .., 'file_level': ..}
"""
# set default values
kwargs = {'logger': None, 'console_level': logging.DEBUG, 'file_level': logging.WARNING}
# iterate over handlers
for handler in logger.handlers:
if isinstance(handler, logging.FileHandler):
kwargs['file_level'] = handler.level
kwargs['logger'] = handler.baseFilename
elif isinstance(handler, logging.StreamHandler):
kwargs['console_level'] = handler.level
return kwargs
def prog_bar(i, n, bar_size=16):
""" Create a progress bar to estimate remaining time
:param i: current iteration
:param n: total number of iterations
:param bar_size: size of the bar
:return: a visualisation of the progress bar
"""
bar = ''
done = (i * bar_size) // n
for j in range(bar_size):
bar += '█' if j <= done else '░'
message = f'{bar} {i}/{n}'
return message
def estimate_required_time(nb_items_in_list, current_index, time_elapsed, interval=100):
""" Compute a remaining time estimation to process all items contained in a list
:param nb_items_in_list: all list items that have to be processed
:param current_index: current list index, contained in [0, nb_items_in_list - 1]
:param time_elapsed: time elapsed to process current_index items in the list
:param interval: estimate remaining time when (current_index % interval) == 0
:return: time elapsed since the last time estimation
"""
current_index += 1 # increment current_idx by 1
if current_index % interval == 0 or current_index == nb_items_in_list:
# make time estimation and put to string format
seconds = (nb_items_in_list - current_index) * (time_elapsed / current_index)
time_estimation = relativedelta(seconds=int(seconds))
time_estimation_string = f'{time_estimation.hours:02}:{time_estimation.minutes:02}:{time_estimation.seconds:02}'
# extract progress bar
progress_bar = prog_bar(i=current_index, n=nb_items_in_list)
# display info
if current_index == nb_items_in_list:
sys.stdout.write(f'\r{progress_bar} -- estimated required time = {time_estimation_string} -- Finished!')
else:
sys.stdout.write(f'\r{progress_bar} -- estimated required time = {time_estimation_string}')
def simple_table(item_tuples, logger=None):
""" Display tuple items in a table
:param item_tuples: items to display. Each tuple item is composed of two components (heading, cell)
:param logger: arg to create logger object
"""
# get logger object
logger = get_logger_from_arg(logger)
# initialize variables
border_pattern = '+---------------------------------------'
whitespace = ' '
# extract table items
headings, cells, = [], []
for item in item_tuples:
# extract heading and cell
heading, cell = str(item[0]), str(item[1])
# create padding
pad_head = True if len(heading) < len(cell) else False
pad = abs(len(heading) - len(cell))
pad = whitespace[:pad]
pad_left = pad[:len(pad)//2]
pad_right = pad[len(pad)//2:]
if pad_head: # pad heading
heading = pad_left + heading + pad_right
else: # pad cell
cell = pad_left + cell + pad_right
headings += [heading]
cells += [cell]
# create the table
border, head, body = '', '', ''
for i in range(len(item_tuples)):
temp_head = f'| {headings[i]} '
temp_body = f'| {cells[i]} '
border += border_pattern[:len(temp_head)]
head += temp_head
body += temp_body
if i == len(item_tuples) - 1:
head += '|'
body += '|'
border += '+'
# display the table
logger.info(border)
logger.info(head)
logger.info(border)
logger.info(body)
logger.info(border)
logger.info(' ')
def get_all_handler_parameters_from_logger(logger):
""" Extract handler parameters from a logger object
:param logger: logger object
:return: list of handler parameters contained in logger object
"""
# initialize list
handler_list = list()
# iterate over handler parameters
for h in logger.handlers:
if isinstance(h, logging.FileHandler):
handler_list.append({'type': type(h),
'level': h.level,
'file_name': h.baseFilename,
'format': deepcopy(h.formatter),
'object': None})
else:
handler_list.append({'type': type(h),
'level': h.level,
'format':deepcopy(h.formatter),
'object': None})
return handler_list
class FakeLogger:
"""
FakeLogger is used in multi-processed functions. It replaces a normal logger object.
It packages message and send them to a queue that a listener will read and write in a proper logger object.
"""
# class variables
queue = None
propagate = False
level = 0
def __init__(self, queue):
self.queue = queue
def send_fake_message_on_queue(self, level, msg):
self.queue.put((level, msg))
def send_warning_of_fake_logger(self, msg):
msg = f'Logging when using LaFAT multiprocess facilities disables some logger functionality such as: {msg}'
self.send_fake_message_on_queue(level=logging.WARNING, msg=msg)
def critical(self, message, *args, **kwargs):
self.send_fake_message_on_queue(level=logging.CRITICAL, msg=message)
def error(self, message, *args, **kwargs):
self.send_fake_message_on_queue(level=logging.ERROR, msg=message)
def warn(self, message, *args, **kwargs):
self.send_fake_message_on_queue(level=logging.WARNING, msg=message)
def warning(self, message, *args, **kwargs):
self.send_fake_message_on_queue(level=logging.WARNING, msg=message)
def info(self, message, *args, **kwargs):
self.send_fake_message_on_queue(level=logging.INFO, msg=message)
def debug(self, message, *args, **kwargs):
self.send_fake_message_on_queue(level=logging.DEBUG, msg=message)
def log(self, message, *args, **kwargs):
self.send_fake_message_on_queue(level=self.level, msg=message)
def exception(self, message, *args, **kwargs):
self.send_fake_message_on_queue(level=logging.ERROR, msg=message)
def handle(self, record):
self.queue.put(record)
def setLevel(self, level):
self.level = level
def isEnabledFor(self, level):
self.send_warning_of_fake_logger(f'isEnabledFor({level})')
return True
def getEffectiveLevel(self):
self.send_warning_of_fake_logger(f'getEffectiveLevel()')
return 0
def getChild(self, *args, **kwargs):
self.send_warning_of_fake_logger(f'getChild()')
return None
def addFilter(self, *args, **kwargs):
self.send_warning_of_fake_logger(f'addFilter()')
def removeFilter(self, *args, **kwargs):
self.send_warning_of_fake_logger(f'removeFilter()')
def filter(self, *args, **kwargs):
self.send_warning_of_fake_logger(f'filter()')
def addHandler(self, *args, **kwargs):
self.send_warning_of_fake_logger(f'addHandler()')
def removeHandler(self, *args, **kwargs):
self.send_warning_of_fake_logger(f'removeHandler()')
def findCaller(self, *args, **kwargs):
self.send_warning_of_fake_logger(f'findCaller()')
def makeRecord(self, *args, **kwargs):
self.send_warning_of_fake_logger(f'makeRecord()')
def hasHandlers(self):
return False
class ConsolePrintLogger:
"""
FakeLogger is used in multi-processed functions. It replaces a normal logger object.
It packages message and send them to a queue that a listener will read and write in a proper logger object.
"""
# class variables
level = 0
def __init__(self, level=0):
self.level = level
def critical(self, message, *args, **kwargs):
if self.level <= logging.CRITICAL:
print(f"[CRITICAL]: {message}")
def error(self, message, *args, **kwargs):
if self.level <= logging.ERROR:
print(f"[ERROR]: {message}")
def warn(self, message, *args, **kwargs):
if self.level <= logging.WARNING:
print(f"[WARNING]: {message}")
def warning(self, message, *args, **kwargs):
if self.level <= logging.WARNING:
print(f"[WARNING]: {message}")
def info(self, message, *args, **kwargs):
if self.level <= logging.INFO:
print(f"[INFO]: {message}")
def debug(self, message, *args, **kwargs):
if self.level <= logging.DEBUG:
print(f"[DEBUG]: {message}")
def log(self, message, *args, **kwargs):
print(f"[LOG]: {message}")
def exception(self, message, *args, **kwargs):
print(f"[EXCEPTION]: {message}")
def handle(self, record):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED HANDLES")
def setLevel(self, level):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED LEVELS")
def isEnabledFor(self, level):
return True
def getEffectiveLevel(self):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED LEVELS")
return 0
def getChild(self, *args, **kwargs):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE CHILDS")
return None
def addFilter(self, *args, **kwargs):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED FILTERS")
def removeFilter(self, *args, **kwargs):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED FILTERS")
def filter(self, *args, **kwargs):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED FILTERS")
def addHandler(self, *args, **kwargs):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED HANDLERS")
def removeHandler(self, *args, **kwargs):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED HANDLERS")
def findCaller(self, *args, **kwargs):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED CALLERS")
def makeRecord(self, *args, **kwargs):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED RECORDS")
def hasHandlers(self):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED HANDLERS")