File size: 3,000 Bytes
dda22df
 
 
 
 
 
 
 
 
 
 
 
 
 
3dd5cf3
dda22df
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7210215
dda22df
 
 
 
 
3dd5cf3
 
 
dda22df
 
 
 
 
 
 
 
 
 
3dd5cf3
dda22df
 
 
3dd5cf3
 
 
 
dda22df
3dd5cf3
dda22df
 
 
 
 
7210215
dda22df
 
 
 
 
 
3dd5cf3
dda22df
 
7210215
dda22df
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import os
from typing import Dict, List, Tuple

from PIL import Image
import pytesseract
import requests
from dotenv import load_dotenv, find_dotenv


class Tools:
    def __init__(self):
        load_dotenv(find_dotenv())

        self.safebrowsing_key = os.getenv("SAFEBROWSING_API_KEY")
        self.base_url = "https://safebrowsing.googleapis.com/v4"
        self.client_id = "minerva"
        self.client_version = "0.1.0"
        self.threat_types = [
            "MALWARE",
            "SOCIAL_ENGINEERING",
            "UNWANTED_SOFTWARE",
            "POTENTIALLY_HARMFUL_APPLICATION"
        ]

    def ocr(self, image_path: str) -> str:
        """Extract text from image using OCR
        """
        try:
            image = Image.open(image_path)
            text = pytesseract.image_to_string(image)
            return text
        except Exception as e:
            return f"Error in text extraction: {str(e)}"

    def expand_url(self, url: str) -> str:
        """Expand shortened URL
        """
        try:
            response = requests.head(url, allow_redirects=True)
            return response.url
        except requests.exceptions.RequestException as e:
            return url  # Return original URL if expansion fails
    
    def is_url_safe(self, target_url: str) -> Tuple[str, List[Dict[str, str]]]:
        """Check if URL is safe using Google Safe Browsing API
        """
        if not self.safebrowsing_key:
            raise ValueError("SAFEBROWSING_API_KEY is missing.")
        
        safe_endpoint = f"{self.base_url}/threatMatches:find?key={self.safebrowsing_key}"
        expanded_url = self.expand_url(target_url)

        request_body = {
            "client": {
                "clientId": self.client_id,
                "clientVersion": self.client_version
            },
            "threatInfo": {
                "threatTypes": self.threat_types,
                "platformTypes": ["ANY_PLATFORM"],
                "threatEntryTypes": ["URL"],
                "threatEntries": [
                    {"url": target_url}
                ]
            }
        }

        if expanded_url != target_url:
            request_body["threatInfo"]["threatEntries"].append({"url": expanded_url})
        
        try:
            response = requests.post(safe_endpoint, json=request_body)
            response.raise_for_status()

            result = response.json()
            
            if not result:
                return "Not Flagged", []
            
            threats = []
            if "matches" in result:
                for match in result["matches"]:
                    threats.append({
                        "threat_type": match.get("threatType"),
                        "threat_url": match.get("threat", {}).get("url"),
                    })
            
            return "Flagged", threats
            
        except requests.exceptions.RequestException as e:
            raise Exception(f"Error checking URL safety: {str(e)}")