Soumic
:zap: Ready the app.py for next experiment. Looks ok
9e90264
raw
history blame
13.4 kB
import os
import random
import huggingface_hub
import numpy as np
from datasets import load_dataset, Dataset
from dotenv import load_dotenv
from pytorch_lightning import LightningDataModule
from pytorch_lightning.utilities.types import TRAIN_DATALOADERS, EVAL_DATALOADERS
from torch.utils.data import DataLoader, IterableDataset
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score, roc_auc_score
# from torchmetrics.classification import BinaryAccuracy, BinaryAUROC, BinaryF1Score, BinaryPrecision, BinaryRecall
from transformers import AutoModelForSequenceClassification, AutoTokenizer, AutoModel, BertModel
from transformers import TrainingArguments, Trainer
import torch
import logging
import wandb
timber = logging.getLogger()
# logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.INFO) # change to level=logging.DEBUG to print more logs...
black = "\u001b[30m"
red = "\u001b[31m"
green = "\u001b[32m"
yellow = "\u001b[33m"
blue = "\u001b[34m"
magenta = "\u001b[35m"
cyan = "\u001b[36m"
white = "\u001b[37m"
FORWARD = "FORWARD_INPUT"
BACKWARD = "BACKWARD_INPUT"
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
PRETRAINED_MODEL_NAME: str = "zhihan1996/DNA_bert_6"
def insert_debug_motif_at_random_position(seq, DEBUG_MOTIF):
start = 0
end = len(seq)
rand_pos = random.randrange(start, (end - len(DEBUG_MOTIF)))
random_end = rand_pos + len(DEBUG_MOTIF)
output = seq[start: rand_pos] + DEBUG_MOTIF + seq[random_end: end]
assert len(seq) == len(output)
return output
class PagingMQTLDataset(IterableDataset):
def __init__(self,
m_dataset,
seq_len,
tokenizer,
max_length=512,
check_if_pipeline_is_ok_by_inserting_debug_motif=False):
self.dataset = m_dataset
self.check_if_pipeline_is_ok_by_inserting_debug_motif = check_if_pipeline_is_ok_by_inserting_debug_motif
self.debug_motif = "ATCGCCTA"
self.seq_len = seq_len
self.bert_tokenizer = tokenizer
self.max_length = max_length
pass
def __iter__(self):
for row in self.dataset:
processed = self.preprocess(row)
if processed is not None:
yield processed
def preprocess(self, row):
sequence = row['sequence'] # Fetch the 'sequence' column
if len(sequence) != self.seq_len:
return None # skip problematic row!
label = row['label'] # Fetch the 'label' column (or whatever target you use)
if label == 1 and self.check_if_pipeline_is_ok_by_inserting_debug_motif:
sequence = insert_debug_motif_at_random_position(seq=sequence, DEBUG_MOTIF=self.debug_motif)
input_ids = self.bert_tokenizer(sequence)["input_ids"]
tokenized_tensor = torch.tensor(input_ids)
label_tensor = torch.tensor(label)
output_dict = {"input_ids": tokenized_tensor, "labels": label_tensor} # so this is now you do it?
return output_dict # tokenized_tensor, label_tensor
class MqtlDataModule(LightningDataModule):
def __init__(self, train_ds, val_ds, test_ds, batch_size=16):
super().__init__()
self.batch_size = batch_size
self.train_loader = DataLoader(train_ds, batch_size=self.batch_size, shuffle=False,
# collate_fn=collate_fn,
num_workers=1,
# persistent_workers=True
)
self.validate_loader = DataLoader(val_ds, batch_size=self.batch_size, shuffle=False,
# collate_fn=collate_fn,
num_workers=1,
# persistent_workers=True
)
self.test_loader = DataLoader(test_ds, batch_size=self.batch_size, shuffle=False,
# collate_fn=collate_fn,
num_workers=1,
# persistent_workers=True
)
pass
def prepare_data(self):
pass
def setup(self, stage: str) -> None:
timber.info(f"inside setup: {stage = }")
pass
def train_dataloader(self) -> TRAIN_DATALOADERS:
return self.train_loader
def val_dataloader(self) -> EVAL_DATALOADERS:
return self.validate_loader
def test_dataloader(self) -> EVAL_DATALOADERS:
return self.test_loader
def create_paging_train_val_test_datasets(tokenizer, WINDOW, is_debug, batch_size=1000):
data_files = {
# small samples
"train_binned_200": "/home/soumic/Codes/mqtl-classification/src/inputdata/dataset_200_train_binned.csv",
"validate_binned_200": "/home/soumic/Codes/mqtl-classification/src/inputdata/dataset_200_validate_binned.csv",
"test_binned_200": "/home/soumic/Codes/mqtl-classification/src/inputdata/dataset_200_test_binned.csv",
# medium samples
"train_binned_1000": "/home/soumic/Codes/mqtl-classification/src/inputdata/dataset_1000_train_binned.csv",
"validate_binned_1000": "/home/soumic/Codes/mqtl-classification/src/inputdata/dataset_1000_validate_binned.csv",
"test_binned_1000": "/home/soumic/Codes/mqtl-classification/src/inputdata/dataset_1000_test_binned.csv",
# large samples
"train_binned_4000": "/home/soumic/Codes/mqtl-classification/src/inputdata/dataset_4000_train_binned.csv",
"validate_binned_4000": "/home/soumic/Codes/mqtl-classification/src/inputdata/dataset_4000_validate_binned.csv",
"test_binned_4000": "/home/soumic/Codes/mqtl-classification/src/inputdata/dataset_4000_test_binned.csv",
}
dataset_map = None
is_my_laptop = os.path.isfile("/home/soumic/Codes/mqtl-classification/src/inputdata/dataset_4000_train_binned.csv")
if is_my_laptop:
dataset_map = load_dataset("csv", data_files=data_files, streaming=True)
else:
dataset_map = load_dataset("fahimfarhan/mqtl-classification-datasets", streaming=True)
train_dataset = PagingMQTLDataset(dataset_map[f"train_binned_{WINDOW}"],
check_if_pipeline_is_ok_by_inserting_debug_motif=is_debug,
tokenizer=tokenizer,
seq_len=WINDOW
)
val_dataset = PagingMQTLDataset(dataset_map[f"validate_binned_{WINDOW}"],
check_if_pipeline_is_ok_by_inserting_debug_motif=is_debug,
tokenizer=tokenizer,
seq_len=WINDOW)
test_dataset = PagingMQTLDataset(dataset_map[f"test_binned_{WINDOW}"],
check_if_pipeline_is_ok_by_inserting_debug_motif=is_debug,
tokenizer=tokenizer,
seq_len=WINDOW)
# data_module = MqtlDataModule(train_ds=train_dataset, val_ds=val_dataset, test_ds=test_dataset, batch_size=batch_size)
return train_dataset, val_dataset, test_dataset
def login_inside_huggingface_virtualmachine():
# Load the .env file, but don't crash if it's not found (e.g., in Hugging Face Space)
try:
load_dotenv() # Only useful on your laptop if .env exists
print(".env file loaded successfully.")
except Exception as e:
print(f"Warning: Could not load .env file. Exception: {e}")
# Try to get the token from environment variables
try:
token = os.getenv("HF_TOKEN")
if not token:
raise ValueError("HF_TOKEN not found. Make sure to set it in the environment variables or .env file.")
# Log in to Hugging Face Hub
huggingface_hub.login(token)
print("Logged in to Hugging Face Hub successfully.")
except Exception as e:
print(f"Error during Hugging Face login: {e}")
# Handle the error appropriately (e.g., exit or retry)
# wand db login
try:
api_key = os.getenv("WAND_DB_API_KEY")
timber.info(f"{api_key = }")
if not api_key:
raise ValueError("WAND_DB_API_KEY not found. Make sure to set it in the environment variables or .env file.")
# Log in to Hugging Face Hub
wandb.login(key=api_key)
print("Logged in to wand db successfully.")
except Exception as e:
print(f"Error during wand db Face login: {e}")
pass
# use sklearn cz torchmetrics.classification gave array index out of bound exception :/ (whatever it is called in python)
def compute_metrics_using_sklearn(p):
try:
pred, labels = p
# Get predicted class labels
pred_labels = np.argmax(pred, axis=1)
# Get predicted probabilities for the positive class
pred_probs = pred[:, 1] # Assuming binary classification and 2 output classes
accuracy = accuracy_score(y_true=labels, y_pred=pred_labels)
recall = recall_score(y_true=labels, y_pred=pred_labels)
precision = precision_score(y_true=labels, y_pred=pred_labels)
f1 = f1_score(y_true=labels, y_pred=pred_labels)
roc_auc = roc_auc_score(y_true=labels, y_score=pred_probs)
return {"accuracy": accuracy, "roc_auc": roc_auc, "precision": precision, "recall": recall, "f1": f1}
except Exception as x:
print(f"compute_metrics_using_sklearn failed with exception: {x}")
return {"accuracy": 0, "roc_auc": 0, "precision": 0, "recall": 0, "f1": 0}
def start():
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
login_inside_huggingface_virtualmachine()
WINDOW = 4000
batch_size = 100
model_local_directory = f"my-awesome-model-{WINDOW}"
model_remote_repository = f"fahimfarhan/dnabert-6-mqtl-classifier-{WINDOW}"
is_my_laptop = os.path.isfile("/home/soumic/Codes/mqtl-classification/src/inputdata/dataset_4000_train_binned.csv")
tokenizer = AutoTokenizer.from_pretrained(PRETRAINED_MODEL_NAME, trust_remote_code=True)
classifier_model = AutoModelForSequenceClassification.from_pretrained(PRETRAINED_MODEL_NAME, num_labels=2)
args = {
"output_dir": "output_dnabert-6-mqtl_classification",
"num_train_epochs": 1,
"max_steps": 20_000, # train 36k + val 4k = 40k
# Set the number of steps you expect to train, originally 1000, takes too much time. So I set it to 10 to run faster and check my code/pipeline
"run_name": "laptop_run_dna-bert-6-mqtl_classification", # Override run_name here
"per_device_train_batch_size": 1,
"gradient_accumulation_steps": 32,
"gradient_checkpointing": True,
"learning_rate": 1e-3,
"save_safetensors": False, # I added it. this solves the runtime error!
# not sure if it is a good idea. sklearn may slow down training, causing time loss... if so, disable these 2 lines below
"evaluation_strategy": "epoch", # To calculate metrics per epoch
"logging_strategy": "epoch" # Extra: to log training data stats for loss
}
training_args = TrainingArguments(**args)
# train_dataset, eval_dataset, test_dataset = create_data_module(tokenizer=tokenizer, WINDOW=WINDOW,
# batch_size=batch_size,
# is_debug=False)
""" # example code
max_length = 32_000
sequence = 'ACTG' * int(max_length / 4)
# sequence = 'ACTG' * int(1000) # seq_len = 4000 it works!
sequence = [sequence] * 8 # Create 8 identical samples
tokenized = tokenizer(sequence)["input_ids"]
labels = [0, 1] * 4
# Create a dataset for training
run_the_code_ds = Dataset.from_dict({"input_ids": tokenized, "labels": labels})
run_the_code_ds.set_format("pt")
"""
train_ds, val_ds, test_ds = create_paging_train_val_test_datasets(tokenizer, WINDOW=WINDOW, is_debug=False)
# train_ds, val_ds, test_ds = run_the_code_ds, run_the_code_ds, run_the_code_ds
# train_ds.set_format("pt") # doesn't work!
trainer = Trainer(
model=classifier_model,
args=training_args,
train_dataset=train_ds,
eval_dataset=val_ds,
compute_metrics=compute_metrics_using_sklearn # torch_metrics.compute_metrics
)
# train, and validate
result = trainer.train()
try:
print(f"{result = }")
except Exception as x:
print(f"{x = }")
# testing
try:
# with torch.no_grad(): # didn't work :/
test_results = trainer.evaluate(eval_dataset=test_ds)
print(f"{test_results = }")
except Exception as oome:
print(f"{oome = }")
finally:
# save the model
model_name = "DnaBert6MQtlClassifier"
classifier_model.save_pretrained(save_directory=model_local_directory, safe_serialization=False)
# push to the hub
commit_message = f":tada: Push model for window size {WINDOW} from huggingface space"
if is_my_laptop:
commit_message = f":tada: Push model for window size {WINDOW} from zephyrus"
classifier_model.push_to_hub(
repo_id=model_remote_repository,
# subfolder=f"my-awesome-model-{WINDOW}", subfolder didn't work :/
commit_message=commit_message, # f":tada: Push model for window size {WINDOW}"
safe_serialization=False
)
pass
def interprete_demo():
is_my_laptop = True
WINDOW = 4000
batch_size = 100
model_local_directory = f"my-awesome-model-{WINDOW}"
model_remote_repository = f"fahimfarhan/dnabert-6-mqtl-classifier-{WINDOW}"
try:
classifier_model = AutoModel.from_pretrained(model_remote_repository)
# todo: use captum / gentech-grelu to interpret the model
except Exception as x:
print(x)
if __name__ == '__main__':
start()
pass