File size: 15,894 Bytes
da855ff
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
import logging
import os
import sys

from copy import deepcopy
from dateutil.relativedelta import relativedelta


def _get_logger(path_to_file, console_level=logging.DEBUG, file_level=logging.WARNING):
    """ Create a logger object to write in the console and in a file

    :param path_to_file:        name of the log file
    :param console_level:       logging level to write in the console
    :param file_level:          logging level to write in the file

    :return: logger object
    """
    # create folder for log file
    os.makedirs(os.path.dirname(path_to_file), exist_ok=True)

    # create logger
    logger = logging.getLogger(path_to_file)
    logger.setLevel(logging.DEBUG)

    # make it display in the console
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(console_level)
    formatter = logging.Formatter('%(message)s', datefmt='%m/%d/%Y %I:%M:%S')
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)

    # make it write on file
    f_handler = logging.FileHandler(path_to_file)
    f_handler.setLevel(file_level)
    formatter_f = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%m/%d/%Y %I:%M:%S')
    f_handler.setFormatter(formatter_f)
    logger.addHandler(f_handler)

    return logger


def _get_root_logger(console_level=logging.DEBUG):
    """ Create a logger object to write in the console

    :param console_level:   logging level to write in the console

    :return: logger object
    """
    # create logger object
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)

    # make it display in the console
    console_handler = logging.StreamHandler(sys.stdout)

    # check if there is already one stream handler
    already_exist = False
    for l in logger.handlers:
        if type(console_handler) == type(l):
            already_exist = True

    if not already_exist:
        console_handler.setLevel(console_level)
        formatter = logging.Formatter('(root_logger) [%(levelname)s] %(message)s', datefmt='%m/%d/%Y %I:%M:%S')
        console_handler.setFormatter(formatter)
        logger.addHandler(console_handler)

    return logger


def _format_logging_level_arg(log_level):
    """ Convert logging level arg to its corresponding int logging level

    :param log_level:   arg referring to the desired logging level

    :return: int corresponding to the desired logging level
    """
    if not isinstance(log_level, str):
        if log_level not in [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR]:
            print('LOGGING LEVEL NOT RECOGNIZED -- Setting it to DEBUG')
            return logging.DEBUG
        else:
            return log_level

    else:
        if log_level.lower().strip() == 'debug':
            return logging.DEBUG
        elif log_level.lower().strip() == 'info':
            return logging.INFO
        elif log_level.lower().strip() in ['warn', 'warning']:
            return logging.WARNING
        elif log_level.lower().strip() == 'error':
            return logging.ERROR
        else:
            print('LOGGING LEVEL NOT RECOGNIZED -- Setting it to DEBUG')
            return logging.DEBUG


def get_logger_from_arg(logger=None, console_level=logging.DEBUG, file_level=logging.WARNING):
    """ Create logger instance to display information

    :param logger:          either None, string or logger instance
                            can also be a dict of keywords: {'logger': .., console_level: .., file_level: ..}
    :param console_level:   (string or int) logging level to write in the console
    :param file_level:      (string or int) logging level to write in a file

    :return: logger object
    """
    # check if keyword arguments were passed into a dictionary
    if isinstance(logger, dict):
        logger = get_logger_from_arg(**logger)

    # directly check if logger already exists
    elif not isinstance(logger, type(logging.getLogger())) and not isinstance(logger, type(logging.getLogger('dummy'))):
        # convert arguments to logging.level (ints)
        console_level = _format_logging_level_arg(console_level)
        file_level = _format_logging_level_arg(file_level)

        # if the logger is an empty string, return the console print logger
        if logger == '' or logger is None:
            logger = ConsolePrintLogger(console_level)

        # if logger is a string, it is assumed to be the path of the logger object
        elif isinstance(logger, str):
            logger = _get_logger(logger, console_level, file_level)

        # if it is a fake logger continue
        elif isinstance(logger, FakeLogger) or isinstance(logger, ConsolePrintLogger):
            pass

        # default logger
        else:
            logger = ConsolePrintLogger()

    return logger


def get_args_from_logger(logger):
    """ Retrieve args that were given to create logger object

    :param logger:  logger object that was created using get_logger_from_arg()

    :return: dictionary -- {'logger': .., 'console_level': .., 'file_level': ..}
    """
    # set default values
    kwargs = {'logger': None, 'console_level': logging.DEBUG, 'file_level': logging.WARNING}

    # iterate over handlers
    for handler in logger.handlers:
        if isinstance(handler, logging.FileHandler):
            kwargs['file_level'] = handler.level
            kwargs['logger'] = handler.baseFilename

        elif isinstance(handler, logging.StreamHandler):
            kwargs['console_level'] = handler.level

    return kwargs


def prog_bar(i, n, bar_size=16):
    """ Create a progress bar to estimate remaining time

    :param i:           current iteration
    :param n:           total number of iterations
    :param bar_size:    size of the bar

    :return: a visualisation of the progress bar
    """
    bar = ''
    done = (i * bar_size) // n

    for j in range(bar_size):
        bar += '█' if j <= done else '░'

    message = f'{bar} {i}/{n}'
    return message


def estimate_required_time(nb_items_in_list, current_index, time_elapsed, interval=100):
    """ Compute a remaining time estimation to process all items contained in a list

    :param nb_items_in_list:        all list items that have to be processed
    :param current_index:           current list index, contained in [0, nb_items_in_list - 1]
    :param time_elapsed:            time elapsed to process current_index items in the list
    :param interval:                estimate remaining time when (current_index % interval) == 0

    :return: time elapsed since the last time estimation
    """
    current_index += 1  # increment current_idx by 1
    if current_index % interval == 0 or current_index == nb_items_in_list:
        # make time estimation and put to string format
        seconds = (nb_items_in_list - current_index) * (time_elapsed / current_index)
        time_estimation = relativedelta(seconds=int(seconds))
        time_estimation_string = f'{time_estimation.hours:02}:{time_estimation.minutes:02}:{time_estimation.seconds:02}'

        # extract progress bar
        progress_bar = prog_bar(i=current_index, n=nb_items_in_list)

        # display info
        if current_index == nb_items_in_list:
            sys.stdout.write(f'\r{progress_bar} -- estimated required time = {time_estimation_string} -- Finished!')
        else:
            sys.stdout.write(f'\r{progress_bar} -- estimated required time = {time_estimation_string}')


def simple_table(item_tuples, logger=None):
    """ Display tuple items in a table

    :param item_tuples:     items to display. Each tuple item is composed of two components (heading, cell)
    :param logger:          arg to create logger object
    """
    # get logger object
    logger = get_logger_from_arg(logger)

    # initialize variables
    border_pattern = '+---------------------------------------'
    whitespace = '                                            '

    # extract table items
    headings, cells, = [], []
    for item in item_tuples:
        # extract heading and cell
        heading, cell = str(item[0]), str(item[1])

        # create padding
        pad_head = True if len(heading) < len(cell) else False
        pad = abs(len(heading) - len(cell))
        pad = whitespace[:pad]
        pad_left = pad[:len(pad)//2]
        pad_right = pad[len(pad)//2:]

        if pad_head:  # pad heading
            heading = pad_left + heading + pad_right
        else:  # pad cell
            cell = pad_left + cell + pad_right

        headings += [heading]
        cells += [cell]

    # create the table
    border, head, body = '', '', ''
    for i in range(len(item_tuples)):
        temp_head = f'| {headings[i]} '
        temp_body = f'| {cells[i]} '

        border += border_pattern[:len(temp_head)]
        head += temp_head
        body += temp_body

        if i == len(item_tuples) - 1:
            head += '|'
            body += '|'
            border += '+'

    # display the table
    logger.info(border)
    logger.info(head)
    logger.info(border)
    logger.info(body)
    logger.info(border)
    logger.info(' ')


def get_all_handler_parameters_from_logger(logger):
    """ Extract handler parameters from a logger object

    :param logger:      logger object

    :return: list of handler parameters contained in logger object
    """
    # initialize list
    handler_list = list()

    # iterate over handler parameters
    for h in logger.handlers:
        if isinstance(h, logging.FileHandler):
            handler_list.append({'type': type(h),
                                 'level': h.level,
                                 'file_name': h.baseFilename,
                                 'format': deepcopy(h.formatter),
                                 'object': None})
        else:
            handler_list.append({'type': type(h),
                                 'level': h.level,
                                 'format':deepcopy(h.formatter),
                                 'object': None})

    return handler_list


class FakeLogger:
    """
        FakeLogger is used in multi-processed functions. It replaces a normal logger object.
        It packages message and send them to a queue that a listener will read and write in a proper logger object.
    """
    # class variables
    queue = None
    propagate = False
    level = 0

    def __init__(self, queue):
        self.queue = queue

    def send_fake_message_on_queue(self, level, msg):
        self.queue.put((level, msg))

    def send_warning_of_fake_logger(self, msg):
        msg = f'Logging when using LaFAT multiprocess facilities disables some logger functionality such as: {msg}'
        self.send_fake_message_on_queue(level=logging.WARNING, msg=msg)

    def critical(self, message, *args, **kwargs):
        self.send_fake_message_on_queue(level=logging.CRITICAL, msg=message)

    def error(self, message, *args, **kwargs):
        self.send_fake_message_on_queue(level=logging.ERROR, msg=message)

    def warn(self, message, *args, **kwargs):
        self.send_fake_message_on_queue(level=logging.WARNING, msg=message)

    def warning(self, message, *args, **kwargs):
        self.send_fake_message_on_queue(level=logging.WARNING, msg=message)

    def info(self, message, *args, **kwargs):
        self.send_fake_message_on_queue(level=logging.INFO, msg=message)

    def debug(self, message, *args, **kwargs):
        self.send_fake_message_on_queue(level=logging.DEBUG, msg=message)

    def log(self, message, *args, **kwargs):
        self.send_fake_message_on_queue(level=self.level, msg=message)

    def exception(self, message, *args, **kwargs):
        self.send_fake_message_on_queue(level=logging.ERROR, msg=message)

    def handle(self, record):
        self.queue.put(record)

    def setLevel(self, level):
        self.level = level

    def isEnabledFor(self, level):
        self.send_warning_of_fake_logger(f'isEnabledFor({level})')
        return True

    def getEffectiveLevel(self):
        self.send_warning_of_fake_logger(f'getEffectiveLevel()')
        return 0

    def getChild(self, *args, **kwargs):
        self.send_warning_of_fake_logger(f'getChild()')
        return None

    def addFilter(self, *args, **kwargs):
        self.send_warning_of_fake_logger(f'addFilter()')

    def removeFilter(self, *args, **kwargs):
        self.send_warning_of_fake_logger(f'removeFilter()')

    def filter(self, *args, **kwargs):
        self.send_warning_of_fake_logger(f'filter()')

    def addHandler(self, *args, **kwargs):
        self.send_warning_of_fake_logger(f'addHandler()')

    def removeHandler(self, *args, **kwargs):
        self.send_warning_of_fake_logger(f'removeHandler()')

    def findCaller(self, *args, **kwargs):
        self.send_warning_of_fake_logger(f'findCaller()')

    def makeRecord(self, *args, **kwargs):
        self.send_warning_of_fake_logger(f'makeRecord()')

    def hasHandlers(self):
        return False


class ConsolePrintLogger:
    """
        FakeLogger is used in multi-processed functions. It replaces a normal logger object.
        It packages message and send them to a queue that a listener will read and write in a proper logger object.
    """
    # class variables
    level = 0

    def __init__(self, level=0):
        self.level = level

    def critical(self, message, *args, **kwargs):
        if self.level <= logging.CRITICAL:
            print(f"[CRITICAL]: {message}")

    def error(self, message, *args, **kwargs):
        if self.level <= logging.ERROR:
            print(f"[ERROR]: {message}")

    def warn(self, message, *args, **kwargs):
        if self.level <= logging.WARNING:
            print(f"[WARNING]: {message}")

    def warning(self, message, *args, **kwargs):
        if self.level <= logging.WARNING:
            print(f"[WARNING]: {message}")

    def info(self, message, *args, **kwargs):
        if self.level <= logging.INFO:
            print(f"[INFO]: {message}")

    def debug(self, message, *args, **kwargs):
        if self.level <= logging.DEBUG:
            print(f"[DEBUG]: {message}")

    def log(self, message, *args, **kwargs):
        print(f"[LOG]: {message}")

    def exception(self, message, *args, **kwargs):
        print(f"[EXCEPTION]: {message}")

    def handle(self, record):
        raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED HANDLES")

    def setLevel(self, level):
        raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED LEVELS")

    def isEnabledFor(self, level):
        return True

    def getEffectiveLevel(self):
        raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED LEVELS")
        return 0

    def getChild(self, *args, **kwargs):
        raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE CHILDS")
        return None

    def addFilter(self, *args, **kwargs):
        raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED FILTERS")

    def removeFilter(self, *args, **kwargs):
        raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED FILTERS")

    def filter(self, *args, **kwargs):
        raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED FILTERS")

    def addHandler(self, *args, **kwargs):
        raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED HANDLERS")

    def removeHandler(self, *args, **kwargs):
        raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED HANDLERS")

    def findCaller(self, *args, **kwargs):
        raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED CALLERS")

    def makeRecord(self, *args, **kwargs):
        raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED RECORDS")

    def hasHandlers(self):
        raise Exception("THIS IS MOCK LOGGER, CREATE A REAL ONE IF YOU NEED HANDLERS")