import json |
import logging |
import os |
import pickle |
from pathlib import Path |
from typing import Any, List |
import anndata |
import dill |
import matplotlib as mpl |
import matplotlib.pyplot as plt |
import numpy as np |
import pandas as pd |
import yaml |
from anndata import AnnData |
from Bio.SeqIO.FastaIO import SimpleFastaParser |
logger = logging.getLogger(__name__) |
def create_dirs(paths:List): |
for path in paths: |
if not os.path.exists(path): |
os.mkdir(path) |
def save(path: Path, data: object, ignore_ext: bool = False) -> Path: |
"""Saves data to this path. Extension and saving function is determined from the type. |
If the correct extension was already in the path its also ok. |
At the moment we handle: |
- pyplot figures -> .pdf |
- dictionaries -> .yaml |
- list -> .yaml |
- numpy -> .npy |
- pandas dataframes -> .tsv |
- anndata -> .h5ad |
- strings -> .txt |
- _anything else_ -> .p (pickled with `dill`) |
Parameters |
---------- |
path : Path |
The full path to save to |
data: object |
Data to save |
ignore_ext : bool |
Whether to ignore adding the normal expected extension |
Returns |
------- |
Path |
The final path to the file |
""" |
if not isinstance(path, Path): |
path = Path(path) |
path.parent.mkdir(parents=True, exist_ok=True) |
annotation_path = os.path.dirname(os.path.abspath(__file__)) |
with open(annotation_path+"/tcga_anndata_groupings.yaml", 'r') as stream: |
tcga_annotations = yaml.safe_load(stream) |
def make_path(p: Path, ext: str) -> Path: |
"""If the path doesn't end with the given extension add the extension to the path. |
Parameters |
---------- |
p : Path |
The path |
ext : str |
The expected extension |
Returns |
------- |
Path |
The fixed path |
""" |
if not ignore_ext and not p.name.endswith(ext): |
return p.parent.joinpath(f"{p.name}{ext}") |
return p |
if isinstance(data, mpl.figure.Figure): |
path = make_path(path, ".pdf") |
data.savefig(path) |
plt.close(data) |
elif isinstance(data, dict): |
path = make_path(path, ".yaml") |
with open(path, "w") as fp: |
yaml.dump(data, fp) |
elif isinstance(data, list): |
path = make_path(path, ".yaml") |
with open(path, "w") as fp: |
yaml.dump(data, fp) |
elif isinstance(data, np.ndarray): |
path = make_path(path, ".npy") |
np.save(path, data) |
elif isinstance(data, pd.DataFrame): |
path = make_path(path, ".tsv") |
data.to_csv(path, sep="\t") |
elif isinstance(data, anndata.AnnData): |
path = make_path(path, ".h5ad") |
for date_col in set(tcga_annotations['anndata']['obs']['datetime_columns']) & set(data.obs.columns): |
if "datetime" in data.obs[date_col].dtype.name: |
data.obs[date_col] = data.obs[date_col].dt.strftime("%Y-%m-%d") |
else: |
logger.info(f"Column {date_col} in obs should be a date but isnt formatted as one.") |
data.write(path) |
elif isinstance(data, str): |
path = make_path(path, ".txt") |
with open(path, "w") as fp: |
fp.write(data) |
else: |
path = make_path(path, ".p") |
dill.dump(data, open(path, "wb")) |
return path |
def _resolve_path(path: Path) -> Path: |
"""Given a path, will try to resolve it in multiple ways: |
1. Is it a path to a S3 bucket? |
2. Is it a global/local file that exists? |
3. Is it path that is a prefix to a file that is unique? |
Parameters |
---------- |
path : Path |
The path |
Returns |
------- |
Path |
The global resolved file. |
Raises |
------ |
FileNotFoundError |
If the file doesn't exists or if there are multiple files that match the glob. |
""" |
if not path.name.startswith("/"): |
path = path.expanduser().resolve() |
if path.exists(): |
return path |
glob_name = path.name if path.name.endswith("*") else path.name + "*" |
paths = list(path.parent.glob(glob_name)) |
if len(paths) == 1: |
return paths[0] |
raise FileNotFoundError( |
f"Was trying to resolve path\n\t{path}*\nbut was ambigious because there are no or multiple files that fit the glob." |
) |
def _to_int_string(element: Any) -> str: |
"""Casts a number to a fixed formatted string that's nice categoriazebale. |
Parameters |
---------- |
element : Any |
The number, float or int |
Returns |
------- |
str |
Either the number formatted as a string or the original input if it |
didn't work |
""" |
try: |
fl = float(element) |
return f"{fl:0.0f}" |
except: |
return element |
def cast_anndata(ad: AnnData) -> None: |
"""Fixes the data-type in the `.obs` and `.var` DataFrame columns of an |
AnnData object. __Works in-place__. Currently does the following: |
1.1. Enforces numerical-categorical `.obs` columns |
1.2. Makes all other `.obs` columns categoricals |
1.3. Makes date-time `.obs` columns, non-categorical pandas `datetime64` |
1.4. Enforces real strinng `.obs` columns, to be strings not categoricals |
1.5. Enforces some numerical `.obs` columns |
Configuration for which column belongs in which group is configured in |
`/transforna/utils/ngs_annotations.yaml` in this repository. |
Parameters |
---------- |
ad : AnnData |
The AnnData object |
""" |
annotation_path = os.path.dirname(os.path.abspath(__file__)) |
with open(annotation_path+"/tcga_anndata_groupings.yaml", 'r') as stream: |
tcga_annotations = yaml.safe_load(stream) |
numerical_categorical_columns: List[str] = set(tcga_annotations['anndata']['obs']['numerical_categorical_columns']) & set( |
ad.obs.columns |
) |
for column in numerical_categorical_columns: |
ad.obs[column] = ad.obs[column].apply(_to_int_string).astype("U").astype("category") |
ad.strings_to_categoricals() |
datetime_columns: List[str] = set(tcga_annotations['anndata']['obs']['datetime_columns']) & set(ad.obs.columns) |
for column in datetime_columns: |
try: |
ad.obs[column] = pd.to_datetime(ad.obs[column]).astype("datetime64[ns]") |
except ValueError as e: |
warning( |
f"""to_datetime error (parsing "unparseable"):\n {e}\nColumn |
{column} will be set as string not as datetime.""" |
) |
ad.obs[column] = ad.obs[column].astype("string") |
string_columns: List[str] = set(tcga_annotations['anndata']['obs']['string_columns']) & set(ad.obs.columns) |
for column in string_columns: |
ad.obs[column] = ad.obs[column].astype("string") |
numerical_columns: List[str] = set(tcga_annotations['anndata']['obs']['numerical_columns']) & set(ad.obs.columns) |
for column in numerical_columns: |
ad.obs[column] = pd.to_numeric(ad.obs[column], errors="coerce") |
boolean_columns: List[str] = set(tcga_annotations['anndata']['var']['boolean_columns']) & set(ad.var.columns) |
for column in boolean_columns: |
ad.var[column].fillna(False, inplace=True) |
ad.var[column] = ad.var[column].astype(bool) |
def load(path: str, ext: str = None, **kwargs): |
"""Loads the given filepath. |
This will use the extension of the filename to determine what to use for |
reading (if not overwritten). Most common use-case: |
At the moment we handle: |
- pickled objects (.p) |
- numpy objects (.npy) |
- dataframes (.csv, .tsv) |
- json files (.json) |
- yaml files (.yaml) |
- anndata files (.h5ad) |
- excel files (.xlsx) |
- text (.txt) |
Parameters |
---------- |
path : str |
The file-name of the cached file, without extension. (Or path) |
The file-name can be a glob match e.g. `/data/something/LC__*__21.7.2.*` |
which matches the everything with anything filling the stars. This only |
works if there is only one match. So this is shortcut if you do not know |
the full name but you know there is only one. |
ext : str, optional |
The extension to assume, ignoring the actual extension. E.g. loading |
"tsv" for a "something.csv" file with tab-limits, by default None |
Returns |
------- |
Whatever is in the saved file. |
Raises |
------ |
FileNotFoundError |
If a given path doesn't exist or doesn't give a unqiue file path. |
NotImplementedError |
Trying to load a file with an extension we do not have loading code for. |
""" |
path = _resolve_path(Path(path)) |
if ext is None: |
ext = path.suffix[1:] |
if ext == "p": |
return pickle.load(open(path, "rb")) |
elif ext == "npy": |
return np.load(path) |
elif ext == "tsv": |
return pd.read_csv(path, sep="\t", **kwargs) |
elif ext == "csv": |
return pd.read_csv(path, **kwargs) |
elif ext == "json": |
return json.load(open(path)) |
elif ext == "yaml": |
return yaml.load(open(path), Loader=yaml.SafeLoader) |
elif ext == "h5ad": |
ad = anndata.read_h5ad(path) |
cast_anndata(ad) |
return ad |
elif ext == "xlsx": |
return pd.read_excel(path, **kwargs) |
elif ext == "txt": |
with open(path, "r") as text_file: |
return text_file.read() |
elif ext == "fa": |
with open(path) as fasta_file: |
identifiers = [] |
seqs = [] |
for title, sequence in SimpleFastaParser(fasta_file): |
identifiers.append(title.split(None, 1)[0]) |
seqs.append(sequence) |
return pd.DataFrame({'Sequences':seqs}) |
else: |
raise NotImplementedError |