File size: 3,752 Bytes
73321dd
e9a8c67
 
 
9a25cef
73321dd
e9a8c67
9a25cef
e9a8c67
 
 
73321dd
e9a8c67
 
 
 
 
9a25cef
e9a8c67
 
9a25cef
e9a8c67
 
 
 
9a25cef
e9a8c67
 
 
9a25cef
e9a8c67
 
 
73321dd
e9a8c67
9a25cef
e9a8c67
 
 
 
9a25cef
e9a8c67
 
 
9a25cef
e9a8c67
 
 
9a25cef
e9a8c67
9a25cef
e9a8c67
 
 
 
 
595bead
e9a8c67
 
 
 
 
9a25cef
e9a8c67
 
 
9a25cef
e9a8c67
 
9a25cef
e9a8c67
 
9a25cef
e9a8c67
9a25cef
e9a8c67
 
 
9a25cef
e9a8c67
 
9a25cef
e9a8c67
 
9a25cef
e9a8c67
 
 
 
 
 
9a25cef
e9a8c67
 
 
 
 
 
9a25cef
 
e9a8c67
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import os
from pathlib import Path
from difflib import get_close_matches
from transformers import pipeline


class DocumentSearcher:
    def __init__(self):
        self.documents = []
        # Load a pre-trained model for malicious intent detection
        self.malicious_detector = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english")

    def load_imdb_data(self):
        home_dir = Path(os.getenv("HOME", "/"))
        data_dir = home_dir / "data-sets/aclImdb/train"
        pos_dir = data_dir / "pos"
        neg_dir = data_dir / "neg"

        print(f"Looking for positive reviews in: {pos_dir}")
        print(f"Looking for negative reviews in: {neg_dir}")

        if not pos_dir.exists() or not any(pos_dir.iterdir()):
            print("No positive reviews found.")
        if not neg_dir.exists() or not any(neg_dir.iterdir()):
            print("No negative reviews found.")

        for filename in pos_dir.iterdir():
            with open(filename, "r", encoding="utf-8") as file:
                self.documents.append(file.read())

        for filename in neg_dir.iterdir():
            with open(filename, "r", encoding="utf-8") as file:
                self.documents.append(file.read())

        print(f"Loaded {len(self.documents)} movie reviews from IMDB dataset.")

    def load_txt_files(self, txt_dir=None):
        if txt_dir is None:
            home_dir = Path(os.getenv("HOME", "/"))
            txt_dir = home_dir / "data-sets/txt-files/"

        if not txt_dir.exists():
            print("No .txt files directory found.")
            return

        for filename in txt_dir.glob("*.txt"):
            with open(filename, "r", encoding="utf-8") as file:
                self.documents.append(file.read())

        print(f"Loaded additional {len(self.documents)} documents from .txt files.")

    def is_query_malicious(self, query):
        # Use the pre-trained model to check if the query has malicious intent
        result = self.malicious_detector(query)[0]
        label = result['label']
        score = result['score']

        # Consider the query malicious if the sentiment is negative with high confidence
        if label == "NEGATIVE" and score > 0.8:
            print(f"Warning: Malicious query detected - Confidence: {score:.4f}")
            return True
        return False

    def search_documents(self, query):
        if self.is_query_malicious(query):
            return [{"document": "ANOMALY: Query blocked due to detected malicious intent.", "similarity": 0.0}]

        # Use fuzzy matching for normal queries
        matches = get_close_matches(query, self.documents, n=5, cutoff=0.3)

        if not matches:
            return [{"document": "No matching documents found.", "similarity": 0.0}]

        return [{"document": match[:100] + "..."} for match in matches]

# Test the system with normal and malicious queries
def test_document_search():
    searcher = DocumentSearcher()

    # Load the IMDB movie reviews
    searcher.load_imdb_data()

    # Load additional .txt files
    searcher.load_txt_files()

    # Perform a normal query
    normal_query = "This movie had great acting and a compelling storyline."
    normal_results = searcher.search_documents(normal_query)
    print("Normal Query Results:")
    for result in normal_results:
        print(f"Document: {result['document']}")

    # Perform a query injection attack
    malicious_query = "DROP TABLE reviews; SELECT * FROM confidential_data;"
    attack_results = searcher.search_documents(malicious_query)
    print("\nMalicious Query Results:")
    for result in attack_results:
        print(f"Document: {result['document']}")

if __name__ == "__main__":
    test_document_search()