|
import pandas as pd |
|
import os |
|
import errno |
|
from pathlib import Path |
|
from Bio.SeqIO.FastaIO import SimpleFastaParser |
|
from datetime import datetime |
|
from getpass import getuser |
|
|
|
import logging |
|
from rich.logging import RichHandler |
|
from functools import wraps |
|
from time import perf_counter |
|
from typing import Callable |
|
|
|
default_path = '../outputs/' |
|
|
|
def humanize_time(time_in_seconds: float, /) -> str: |
|
"""Return a nicely human-readable string of a time_in_seconds. |
|
|
|
Parameters |
|
---------- |
|
time_in_seconds : float |
|
Time in seconds, (not full seconds). |
|
|
|
Returns |
|
------- |
|
str |
|
A description of the time in one of the forms: |
|
- 300.1 ms |
|
- 4.5 sec |
|
- 5 min 43.1 sec |
|
""" |
|
sgn = "" if time_in_seconds >= 0 else "- " |
|
time_in_seconds = abs(time_in_seconds) |
|
if time_in_seconds < 1: |
|
return f"{sgn}{time_in_seconds*1e3:.1f} ms" |
|
elif time_in_seconds < 60: |
|
return f"{sgn}{time_in_seconds:.1f} sec" |
|
else: |
|
return f"{sgn}{int(time_in_seconds//60)} min {time_in_seconds%60:.1f} sec" |
|
|
|
|
|
class log_time: |
|
"""A decorator / context manager to log the time a certain function / code block took. |
|
|
|
Usage either with: |
|
|
|
@log_time(log) |
|
def function_getting_logged_every_time(…): |
|
… |
|
|
|
producing: |
|
|
|
function_getting_logged_every_time took 5 sec. |
|
|
|
or: |
|
|
|
with log_time(log, "Name of this codeblock"): |
|
… |
|
|
|
producing: |
|
|
|
Name of this codeblock took 5 sec. |
|
""" |
|
|
|
def __init__(self, logger: logging.Logger, name: str = None): |
|
""" |
|
Parameters |
|
---------- |
|
logger : logging.Logger |
|
The logger to use for logging the time, if None use print. |
|
name : str, optional |
|
The name in the message, when used as a decorator this defaults to the function name, by default None |
|
""" |
|
self.logger = logger |
|
self.name = name |
|
|
|
def __call__(self, func: Callable): |
|
if self.name is None: |
|
self.name = func.__qualname__ |
|
|
|
@wraps(func) |
|
def inner(*args, **kwds): |
|
with self: |
|
return func(*args, **kwds) |
|
|
|
return inner |
|
|
|
def __enter__(self): |
|
self.start_time = perf_counter() |
|
|
|
def __exit__(self, *exc): |
|
self.exit_time = perf_counter() |
|
|
|
time_delta = humanize_time(self.exit_time - self.start_time) |
|
if self.logger is None: |
|
print(f"{self.name} took {time_delta}.") |
|
else: |
|
self.logger.info(f"{self.name} took {time_delta}.") |
|
|
|
|
|
def write_2_log(log_file): |
|
|
|
log_file_handler = logging.FileHandler(log_file) |
|
log_file_handler.setLevel(logging.INFO) |
|
log_file_handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")) |
|
log_rich_handler = RichHandler() |
|
log_rich_handler.setLevel(logging.INFO) |
|
log_rich_handler.setFormatter(logging.Formatter("%(message)s")) |
|
logging.basicConfig(level=logging.INFO, datefmt="[%X]", handlers=[log_file_handler, log_rich_handler]) |
|
|
|
|
|
def fasta2df(path): |
|
with open(path) as fasta_file: |
|
identifiers = [] |
|
seqs = [] |
|
for header, sequence in SimpleFastaParser(fasta_file): |
|
identifiers.append(header) |
|
seqs.append(sequence) |
|
|
|
fasta_df = pd.DataFrame(seqs, identifiers, columns=['sequence']) |
|
fasta_df['sequence'] = fasta_df.sequence.apply(lambda x: x.replace('U','T')) |
|
return fasta_df |
|
|
|
|
|
|
|
def fasta2df_subheader(path, id_pos): |
|
with open(path) as fasta_file: |
|
identifiers = [] |
|
seqs = [] |
|
for header, sequence in SimpleFastaParser(fasta_file): |
|
identifiers.append(header.split(None)[id_pos]) |
|
seqs.append(sequence) |
|
|
|
fasta_df = pd.DataFrame(seqs, identifiers, columns=['sequence']) |
|
fasta_df['sequence'] = fasta_df.sequence.apply(lambda x: x.replace('U','T')) |
|
return fasta_df |
|
|
|
|
|
|
|
def build_bowtie_index(bowtie_index_file): |
|
|
|
|
|
print('-------- index is build --------') |
|
os.system(f"bowtie-build {bowtie_index_file + '.fa'} {bowtie_index_file}") |
|
|
|
|
|
|
|
|
|
def make_output_dir(fasta_file): |
|
output_dir = default_path + datetime.now().strftime('%Y-%m-%d') + ('__') + fasta_file.replace('.fasta', '').replace('.fa', '') + '/' |
|
try: |
|
os.makedirs(output_dir) |
|
except OSError as e: |
|
if e.errno != errno.EEXIST: |
|
raise |
|
return output_dir |
|
|
|
|
|
def reverse_complement(seq): |
|
complement = {'A': 'T', 'C': 'G', 'G': 'C', 'T': 'A'} |
|
return ''.join([complement[base] for base in seq[::-1]]) |
|
|
|
|