Spaces:
Running
Running
import os | |
from pathlib import Path | |
from difflib import get_close_matches | |
from transformers import pipeline | |
class DocumentSearcher: | |
def __init__(self): | |
self.documents = [] | |
# Load a pre-trained model for malicious intent detection | |
self.malicious_detector = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english") | |
def load_imdb_data(self): | |
home_dir = Path(os.getenv("HOME", "/")) | |
data_dir = home_dir / "data-sets/aclImdb/train" | |
pos_dir = data_dir / "pos" | |
neg_dir = data_dir / "neg" | |
print(f"Looking for positive reviews in: {pos_dir}") | |
print(f"Looking for negative reviews in: {neg_dir}") | |
if not pos_dir.exists() or not any(pos_dir.iterdir()): | |
print("No positive reviews found.") | |
if not neg_dir.exists() or not any(neg_dir.iterdir()): | |
print("No negative reviews found.") | |
for filename in pos_dir.iterdir(): | |
with open(filename, "r", encoding="utf-8") as file: | |
self.documents.append(file.read()) | |
for filename in neg_dir.iterdir(): | |
with open(filename, "r", encoding="utf-8") as file: | |
self.documents.append(file.read()) | |
print(f"Loaded {len(self.documents)} movie reviews from IMDB dataset.") | |
def load_txt_files(self, txt_dir=None): | |
if txt_dir is None: | |
home_dir = Path(os.getenv("HOME", "/")) | |
txt_dir = home_dir / "data-sets/txt-files/" | |
if not txt_dir.exists(): | |
print("No .txt files directory found.") | |
return | |
for filename in txt_dir.glob("*.txt"): | |
with open(filename, "r", encoding="utf-8") as file: | |
self.documents.append(file.read()) | |
print(f"Loaded additional {len(self.documents)} documents from .txt files.") | |
def is_query_malicious(self, query): | |
# Use the pre-trained model to check if the query has malicious intent | |
result = self.malicious_detector(query)[0] | |
label = result['label'] | |
score = result['score'] | |
# Consider the query malicious if the sentiment is negative with high confidence | |
if label == "NEGATIVE" and score > 0.8: | |
print(f"Warning: Malicious query detected - Confidence: {score:.4f}") | |
return True | |
return False | |
def search_documents(self, query): | |
if self.is_query_malicious(query): | |
return [{"document": "ANOMALY: Query blocked due to detected malicious intent.", "similarity": 0.0}] | |
# Use fuzzy matching for normal queries | |
matches = get_close_matches(query, self.documents, n=5, cutoff=0.3) | |
if not matches: | |
return [{"document": "No matching documents found.", "similarity": 0.0}] | |
return [{"document": match[:100] + "..."} for match in matches] | |
# Test the system with normal and malicious queries | |
def test_document_search(): | |
searcher = DocumentSearcher() | |
# Load the IMDB movie reviews | |
searcher.load_imdb_data() | |
# Load additional .txt files | |
searcher.load_txt_files() | |
# Perform a normal query | |
normal_query = "This movie had great acting and a compelling storyline." | |
normal_results = searcher.search_documents(normal_query) | |
print("Normal Query Results:") | |
for result in normal_results: | |
print(f"Document: {result['document']}") | |
# Perform a query injection attack | |
malicious_query = "DROP TABLE reviews; SELECT * FROM confidential_data;" | |
attack_results = searcher.search_documents(malicious_query) | |
print("\nMalicious Query Results:") | |
for result in attack_results: | |
print(f"Document: {result['document']}") | |
if __name__ == "__main__": | |
test_document_search() |