File size: 4,799 Bytes
0b11a42 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
import pandas as pd
import os
import errno
from pathlib import Path
from Bio.SeqIO.FastaIO import SimpleFastaParser
from datetime import datetime
from getpass import getuser
import logging
from rich.logging import RichHandler
from functools import wraps
from time import perf_counter
from typing import Callable
default_path = '../outputs/'
def humanize_time(time_in_seconds: float, /) -> str:
"""Return a nicely human-readable string of a time_in_seconds.
Parameters
----------
time_in_seconds : float
Time in seconds, (not full seconds).
Returns
-------
str
A description of the time in one of the forms:
- 300.1 ms
- 4.5 sec
- 5 min 43.1 sec
"""
sgn = "" if time_in_seconds >= 0 else "- "
time_in_seconds = abs(time_in_seconds)
if time_in_seconds < 1:
return f"{sgn}{time_in_seconds*1e3:.1f} ms"
elif time_in_seconds < 60:
return f"{sgn}{time_in_seconds:.1f} sec"
else:
return f"{sgn}{int(time_in_seconds//60)} min {time_in_seconds%60:.1f} sec"
class log_time:
"""A decorator / context manager to log the time a certain function / code block took.
Usage either with:
@log_time(log)
def function_getting_logged_every_time(…):
…
producing:
function_getting_logged_every_time took 5 sec.
or:
with log_time(log, "Name of this codeblock"):
…
producing:
Name of this codeblock took 5 sec.
"""
def __init__(self, logger: logging.Logger, name: str = None):
"""
Parameters
----------
logger : logging.Logger
The logger to use for logging the time, if None use print.
name : str, optional
The name in the message, when used as a decorator this defaults to the function name, by default None
"""
self.logger = logger
self.name = name
def __call__(self, func: Callable):
if self.name is None:
self.name = func.__qualname__
@wraps(func)
def inner(*args, **kwds):
with self:
return func(*args, **kwds)
return inner
def __enter__(self):
self.start_time = perf_counter()
def __exit__(self, *exc):
self.exit_time = perf_counter()
time_delta = humanize_time(self.exit_time - self.start_time)
if self.logger is None:
print(f"{self.name} took {time_delta}.")
else:
self.logger.info(f"{self.name} took {time_delta}.")
def write_2_log(log_file):
# Setup logging
log_file_handler = logging.FileHandler(log_file)
log_file_handler.setLevel(logging.INFO)
log_file_handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
log_rich_handler = RichHandler()
log_rich_handler.setLevel(logging.INFO) #cli_args.log_level
log_rich_handler.setFormatter(logging.Formatter("%(message)s"))
logging.basicConfig(level=logging.INFO, datefmt="[%X]", handlers=[log_file_handler, log_rich_handler])
def fasta2df(path):
with open(path) as fasta_file:
identifiers = []
seqs = []
for header, sequence in SimpleFastaParser(fasta_file):
identifiers.append(header)
seqs.append(sequence)
fasta_df = pd.DataFrame(seqs, identifiers, columns=['sequence'])
fasta_df['sequence'] = fasta_df.sequence.apply(lambda x: x.replace('U','T'))
return fasta_df
def fasta2df_subheader(path, id_pos):
with open(path) as fasta_file:
identifiers = []
seqs = []
for header, sequence in SimpleFastaParser(fasta_file):
identifiers.append(header.split(None)[id_pos])
seqs.append(sequence)
fasta_df = pd.DataFrame(seqs, identifiers, columns=['sequence'])
fasta_df['sequence'] = fasta_df.sequence.apply(lambda x: x.replace('U','T'))
return fasta_df
def build_bowtie_index(bowtie_index_file):
#index_example = Path(bowtie_index_file + '.1.ebwt')
#if not index_example.is_file():
print('-------- index is build --------')
os.system(f"bowtie-build {bowtie_index_file + '.fa'} {bowtie_index_file}")
#else: print('-------- previously built index is used --------')
def make_output_dir(fasta_file):
output_dir = default_path + datetime.now().strftime('%Y-%m-%d') + ('__') + fasta_file.replace('.fasta', '').replace('.fa', '') + '/'
try:
os.makedirs(output_dir)
except OSError as e:
if e.errno != errno.EEXIST:
raise # This was not a "directory exist" error..
return output_dir
def reverse_complement(seq):
complement = {'A': 'T', 'C': 'G', 'G': 'C', 'T': 'A'}
return ''.join([complement[base] for base in seq[::-1]])
|