File size: 15,894 Bytes
da855ff |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 |
import logging
import os
import sys
from copy import deepcopy
from dateutil.relativedelta import relativedelta
def _get_logger(path_to_file, console_level=logging.DEBUG, file_level=logging.WARNING):
""" Create a logger object to write in the console and in a file
:param path_to_file: name of the log file
:param console_level: logging level to write in the console
:param file_level: logging level to write in the file
:return: logger object
"""
# create folder for log file
os.makedirs(os.path.dirname(path_to_file), exist_ok=True)
# create logger
logger = logging.getLogger(path_to_file)
logger.setLevel(logging.DEBUG)
# make it display in the console
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(console_level)
formatter = logging.Formatter('%(message)s', datefmt='%m/%d/%Y %I:%M:%S')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# make it write on file
f_handler = logging.FileHandler(path_to_file)
f_handler.setLevel(file_level)
formatter_f = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%m/%d/%Y %I:%M:%S')
f_handler.setFormatter(formatter_f)
logger.addHandler(f_handler)
return logger
def _get_root_logger(console_level=logging.DEBUG):
""" Create a logger object to write in the console
:param console_level: logging level to write in the console
:return: logger object
"""
# create logger object
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# make it display in the console
console_handler = logging.StreamHandler(sys.stdout)
# check if there is already one stream handler
already_exist = False
for l in logger.handlers:
if type(console_handler) == type(l):
already_exist = True
if not already_exist:
console_handler.setLevel(console_level)
formatter = logging.Formatter('(root_logger) [%(levelname)s] %(message)s', datefmt='%m/%d/%Y %I:%M:%S')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
def _format_logging_level_arg(log_level):
""" Convert logging level arg to its corresponding int logging level
:param log_level: arg referring to the desired logging level
:return: int corresponding to the desired logging level
"""
if not isinstance(log_level, str):
if log_level not in [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR]:
print('LOGGING LEVEL NOT RECOGNIZED -- Setting it to DEBUG')
return logging.DEBUG
else:
return log_level
else:
if log_level.lower().strip() == 'debug':
return logging.DEBUG
elif log_level.lower().strip() == 'info':
return logging.INFO
elif log_level.lower().strip() in ['warn', 'warning']:
return logging.WARNING
elif log_level.lower().strip() == 'error':
return logging.ERROR
else:
print('LOGGING LEVEL NOT RECOGNIZED -- Setting it to DEBUG')
return logging.DEBUG
def get_logger_from_arg(logger=None, console_level=logging.DEBUG, file_level=logging.WARNING):
""" Create logger instance to display information
:param logger: either None, string or logger instance
can also be a dict of keywords: {'logger': .., console_level: .., file_level: ..}
:param console_level: (string or int) logging level to write in the console
:param file_level: (string or int) logging level to write in a file
:return: logger object
"""
# check if keyword arguments were passed into a dictionary
if isinstance(logger, dict):
logger = get_logger_from_arg(**logger)
# directly check if logger already exists
elif not isinstance(logger, type(logging.getLogger())) and not isinstance(logger, type(logging.getLogger('dummy'))):
# convert arguments to logging.level (ints)
console_level = _format_logging_level_arg(console_level)
file_level = _format_logging_level_arg(file_level)
# if the logger is an empty string, return the console print logger
if logger == '' or logger is None:
logger = ConsolePrintLogger(console_level)
# if logger is a string, it is assumed to be the path of the logger object
elif isinstance(logger, str):
logger = _get_logger(logger, console_level, file_level)
# if it is a fake logger continue
elif isinstance(logger, FakeLogger) or isinstance(logger, ConsolePrintLogger):
pass
# default logger
else:
logger = ConsolePrintLogger()
return logger
def get_args_from_logger(logger):
""" Retrieve args that were given to create logger object
:param logger: logger object that was created using get_logger_from_arg()
:return: dictionary -- {'logger': .., 'console_level': .., 'file_level': ..}
"""
# set default values
kwargs = {'logger': None, 'console_level': logging.DEBUG, 'file_level': logging.WARNING}
# iterate over handlers
for handler in logger.handlers:
if isinstance(handler, logging.FileHandler):
kwargs['file_level'] = handler.level
kwargs['logger'] = handler.baseFilename
elif isinstance(handler, logging.StreamHandler):
kwargs['console_level'] = handler.level
return kwargs
def prog_bar(i, n, bar_size=16):
""" Create a progress bar to estimate remaining time
:param i: current iteration
:param n: total number of iterations
:param bar_size: size of the bar
:return: a visualisation of the progress bar
"""
bar = ''
done = (i * bar_size) // n
for j in range(bar_size):
bar += '█' if j <= done else '░'
message = f'{bar} {i}/{n}'
return message
def estimate_required_time(nb_items_in_list, current_index, time_elapsed, interval=100):
""" Compute a remaining time estimation to process all items contained in a list
:param nb_items_in_list: all list items that have to be processed
:param current_index: current list index, contained in [0, nb_items_in_list - 1]
:param time_elapsed: time elapsed to process current_index items in the list
:param interval: estimate remaining time when (current_index % interval) == 0
:return: time elapsed since the last time estimation
"""
current_index += 1 # increment current_idx by 1
if current_index % interval == 0 or current_index == nb_items_in_list:
# make time estimation and put to string format
seconds = (nb_items_in_list - current_index) * (time_elapsed / current_index)
time_estimation = relativedelta(seconds=int(seconds))
time_estimation_string = f'{time_estimation.hours:02}:{time_estimation.minutes:02}:{time_estimation.seconds:02}'
# extract progress bar
progress_bar = prog_bar(i=current_index, n=nb_items_in_list)
# display info
if current_index == nb_items_in_list:
sys.stdout.write(f'\r{progress_bar} -- estimated required time = {time_estimation_string} -- Finished!')
else:
sys.stdout.write(f'\r{progress_bar} -- estimated required time = {time_estimation_string}')
def simple_table(item_tuples, logger=None):
""" Display tuple items in a table
:param item_tuples: items to display. Each tuple item is composed of two components (heading, cell)
:param logger: arg to create logger object
"""
# get logger object
logger = get_logger_from_arg(logger)
# initialize variables
border_pattern = '+---------------------------------------'
whitespace = ' '
# extract table items
headings, cells, = [], []
for item in item_tuples:
# extract heading and cell
heading, cell = str(item[0]), str(item[1])
# create padding
pad_head = True if len(heading) < len(cell) else False
pad = abs(len(heading) - len(cell))
pad = whitespace[:pad]
pad_left = pad[:len(pad)//2]
pad_right = pad[len(pad)//2:]
if pad_head: # pad heading
heading = pad_left + heading + pad_right
else: # pad cell
cell = pad_left + cell + pad_right
headings += [heading]
cells += [cell]
# create the table
border, head, body = '', '', ''
for i in range(len(item_tuples)):
temp_head = f'| {headings[i]} '
temp_body = f'| {cells[i]} '
border += border_pattern[:len(temp_head)]
head += temp_head
body += temp_body
if i == len(item_tuples) - 1:
head += '|'
body += '|'
border += '+'
# display the table
logger.info(border)
logger.info(head)
logger.info(border)
logger.info(body)
logger.info(border)
logger.info(' ')
def get_all_handler_parameters_from_logger(logger):
""" Extract handler parameters from a logger object
:param logger: logger object
:return: list of handler parameters contained in logger object
"""
# initialize list
handler_list = list()
# iterate over handler parameters
for h in logger.handlers:
if isinstance(h, logging.FileHandler):
handler_list.append({'type': type(h),
'level': h.level,
'file_name': h.baseFilename,
'format': deepcopy(h.formatter),
'object': None})
else:
handler_list.append({'type': type(h),
'level': h.level,
'format':deepcopy(h.formatter),
'object': None})
return handler_list
class FakeLogger:
"""
FakeLogger is used in multi-processed functions. It replaces a normal logger object.
It packages message and send them to a queue that a listener will read and write in a proper logger object.
"""
# class variables
queue = None
propagate = False
level = 0
def __init__(self, queue):
self.queue = queue
def send_fake_message_on_queue(self, level, msg):
self.queue.put((level, msg))
def send_warning_of_fake_logger(self, msg):
msg = f'Logging when using LaFAT multiprocess facilities disables some logger functionality such as: {msg}'
self.send_fake_message_on_queue(level=logging.WARNING, msg=msg)
def critical(self, message, *args, **kwargs):
self.send_fake_message_on_queue(level=logging.CRITICAL, msg=message)
def error(self, message, *args, **kwargs):
self.send_fake_message_on_queue(level=logging.ERROR, msg=message)
def warn(self, message, *args, **kwargs):
self.send_fake_message_on_queue(level=logging.WARNING, msg=message)
def warning(self, message, *args, **kwargs):
self.send_fake_message_on_queue(level=logging.WARNING, msg=message)
def info(self, message, *args, **kwargs):
self.send_fake_message_on_queue(level=logging.INFO, msg=message)
def debug(self, message, *args, **kwargs):
self.send_fake_message_on_queue(level=logging.DEBUG, msg=message)
def log(self, message, *args, **kwargs):
self.send_fake_message_on_queue(level=self.level, msg=message)
def exception(self, message, *args, **kwargs):
self.send_fake_message_on_queue(level=logging.ERROR, msg=message)
def handle(self, record):
self.queue.put(record)
def setLevel(self, level):
self.level = level
def isEnabledFor(self, level):
self.send_warning_of_fake_logger(f'isEnabledFor({level})')
return True
def getEffectiveLevel(self):
self.send_warning_of_fake_logger(f'getEffectiveLevel()')
return 0
def getChild(self, *args, **kwargs):
self.send_warning_of_fake_logger(f'getChild()')
return None
def addFilter(self, *args, **kwargs):
self.send_warning_of_fake_logger(f'addFilter()')
def removeFilter(self, *args, **kwargs):
self.send_warning_of_fake_logger(f'removeFilter()')
def filter(self, *args, **kwargs):
self.send_warning_of_fake_logger(f'filter()')
def addHandler(self, *args, **kwargs):
self.send_warning_of_fake_logger(f'addHandler()')
def removeHandler(self, *args, **kwargs):
self.send_warning_of_fake_logger(f'removeHandler()')
def findCaller(self, *args, **kwargs):
self.send_warning_of_fake_logger(f'findCaller()')
def makeRecord(self, *args, **kwargs):
self.send_warning_of_fake_logger(f'makeRecord()')
def hasHandlers(self):
return False
class ConsolePrintLogger:
"""
FakeLogger is used in multi-processed functions. It replaces a normal logger object.
It packages message and send them to a queue that a listener will read and write in a proper logger object.
"""
# class variables
level = 0
def __init__(self, level=0):
self.level = level
def critical(self, message, *args, **kwargs):
if self.level <= logging.CRITICAL:
print(f"[CRITICAL]: {message}")
def error(self, message, *args, **kwargs):
if self.level <= logging.ERROR:
print(f"[ERROR]: {message}")
def warn(self, message, *args, **kwargs):
if self.level <= logging.WARNING:
print(f"[WARNING]: {message}")
def warning(self, message, *args, **kwargs):
if self.level <= logging.WARNING:
print(f"[WARNING]: {message}")
def info(self, message, *args, **kwargs):
if self.level <= logging.INFO:
print(f"[INFO]: {message}")
def debug(self, message, *args, **kwargs):
if self.level <= logging.DEBUG:
print(f"[DEBUG]: {message}")
def log(self, message, *args, **kwargs):
print(f"[LOG]: {message}")
def exception(self, message, *args, **kwargs):
print(f"[EXCEPTION]: {message}")
def handle(self, record):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED HANDLES")
def setLevel(self, level):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED LEVELS")
def isEnabledFor(self, level):
return True
def getEffectiveLevel(self):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED LEVELS")
return 0
def getChild(self, *args, **kwargs):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE CHILDS")
return None
def addFilter(self, *args, **kwargs):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED FILTERS")
def removeFilter(self, *args, **kwargs):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED FILTERS")
def filter(self, *args, **kwargs):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED FILTERS")
def addHandler(self, *args, **kwargs):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED HANDLERS")
def removeHandler(self, *args, **kwargs):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED HANDLERS")
def findCaller(self, *args, **kwargs):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED CALLERS")
def makeRecord(self, *args, **kwargs):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED RECORDS")
def hasHandlers(self):
raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED HANDLERS")
|