Spaces:
Running
Running
import gradio as gr | |
import torch | |
import joblib | |
import numpy as np | |
from itertools import product | |
import torch.nn as nn | |
class VirusClassifier(nn.Module): | |
def __init__(self, input_shape: int): | |
super(VirusClassifier, self).__init__() | |
self.network = nn.Sequential( | |
nn.Linear(input_shape, 64), | |
nn.GELU(), | |
nn.BatchNorm1d(64), | |
nn.Dropout(0.3), | |
nn.Linear(64, 32), | |
nn.GELU(), | |
nn.BatchNorm1d(32), | |
nn.Dropout(0.3), | |
nn.Linear(32, 32), | |
nn.GELU(), | |
nn.Linear(32, 2) | |
) | |
def forward(self, x): | |
return self.network(x) | |
def get_feature_importance(self, x): | |
"""Calculate feature importance using gradient-based method""" | |
x.requires_grad_(True) | |
output = self.network(x) | |
importance = torch.zeros_like(x) | |
for i in range(output.shape[1]): | |
if x.grad is not None: | |
x.grad.zero_() | |
output[..., i].sum().backward(retain_graph=True) | |
importance += torch.abs(x.grad) | |
return importance | |
def sequence_to_kmer_vector(sequence: str, k: int = 4) -> np.ndarray: | |
"""Convert sequence to k-mer frequency vector""" | |
# Generate all possible k-mers | |
kmers = [''.join(p) for p in product("ACGT", repeat=k)] | |
kmer_dict = {km: i for i, km in enumerate(kmers)} | |
# Initialize vector | |
vec = np.zeros(len(kmers), dtype=np.float32) | |
# Count k-mers | |
for i in range(len(sequence) - k + 1): | |
kmer = sequence[i:i+k] | |
if kmer in kmer_dict: | |
vec[kmer_dict[kmer]] += 1 | |
# Convert to frequencies | |
total_kmers = len(sequence) - k + 1 | |
if total_kmers > 0: | |
vec = vec / total_kmers | |
return vec | |
def parse_fasta(text): | |
sequences = [] | |
current_header = None | |
current_sequence = [] | |
for line in text.split('\n'): | |
line = line.strip() | |
if not line: | |
continue | |
if line.startswith('>'): | |
if current_header: | |
sequences.append((current_header, ''.join(current_sequence))) | |
current_header = line[1:] | |
current_sequence = [] | |
else: | |
current_sequence.append(line.upper()) | |
if current_header: | |
sequences.append((current_header, ''.join(current_sequence))) | |
return sequences | |
def predict(file_obj): | |
if file_obj is None: | |
return "Please upload a FASTA file" | |
# Read the file content | |
try: | |
if isinstance(file_obj, str): | |
text = file_obj | |
else: | |
text = file_obj.decode('utf-8') | |
except Exception as e: | |
return f"Error reading file: {str(e)}" | |
# Generate k-mer dictionary | |
k = 4 # k-mer size | |
kmers = [''.join(p) for p in product("ACGT", repeat=k)] | |
kmer_dict = {km: i for i, km in enumerate(kmers)} | |
# Load model and scaler | |
try: | |
device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
model = VirusClassifier(256).to(device) # k=4 -> 4^4 = 256 features | |
# Load model with explicit map_location | |
state_dict = torch.load('model.pt', map_location=device) | |
model.load_state_dict(state_dict) | |
# Load scaler | |
scaler = joblib.load('scaler.pkl') | |
# Set model to evaluation mode | |
model.eval() | |
except Exception as e: | |
return f"Error loading model: {str(e)}\nFull traceback: {str(e.__traceback__)}" | |
# Get predictions | |
results = [] | |
try: | |
sequences = parse_fasta(text) | |
for header, seq in sequences: | |
# Get raw frequency vector and scaled vector | |
raw_freq_vector = sequence_to_kmer_vector(seq) | |
kmer_vector = scaler.transform(raw_freq_vector.reshape(1, -1)) | |
X_tensor = torch.FloatTensor(kmer_vector).to(device) | |
# Get predictions and feature importance | |
with torch.no_grad(): | |
output = model(X_tensor) | |
probs = torch.softmax(output, dim=1) | |
# Calculate feature importance | |
importance = model.get_feature_importance(X_tensor) | |
kmer_importance = importance[0].cpu().numpy() | |
# Normalize importance scores to original scale | |
kmer_importance = kmer_importance / np.max(np.abs(kmer_importance)) * 0.002 | |
# Get top 10 k-mers based on absolute importance | |
top_k = 10 | |
top_indices = np.argsort(np.abs(kmer_importance))[-top_k:][::-1] | |
important_kmers = [ | |
{ | |
'kmer': list(kmer_dict.keys())[list(kmer_dict.values()).index(i)], | |
'importance': float(kmer_importance[i]), | |
'frequency': float(raw_freq_vector[i]), | |
'scaled': float(kmer_vector[0][i]) | |
} | |
for i in top_indices | |
] | |
# Format results | |
pred_class = 1 if probs[0][1] > probs[0][0] else 0 | |
pred_label = 'human' if pred_class == 1 else 'non-human' | |
result = f"""Sequence: {header} | |
Prediction: {pred_label} | |
Confidence: {float(max(probs[0])):0.4f} | |
Human probability: {float(probs[0][1]):0.4f} | |
Non-human probability: {float(probs[0][0]):0.4f} | |
Most influential k-mers:""" | |
for kmer in important_kmers: | |
result += f"\n {kmer['kmer']}: importance={kmer['importance']:.4f}, frequency={kmer['frequency']:.4f} ({kmer['frequency']*100:.2f}%), scaled={kmer['scaled']:.4f}" | |
results.append(result) | |
except Exception as e: | |
return f"Error processing sequences: {str(e)}" | |
return "\n\n".join(results) | |
# Create the interface | |
iface = gr.Interface( | |
fn=predict, | |
inputs=gr.File(label="Upload FASTA file", type="binary"), | |
outputs=gr.Textbox(label="Results"), | |
title="Virus Host Classifier" | |
) | |
# Launch the interface | |
if __name__ == "__main__": | |
iface.launch() |