import os import time import hashlib import logging import datetime import csv import threading from urllib.parse import urlparse from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import ( TimeoutException, NoSuchElementException, StaleElementReferenceException, ) from webdriver_manager.chrome import ChromeDriverManager # Added import from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline import feedparser import gradio as gr import xml.etree.ElementTree as ET # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) # Define constants DEFAULT_FILE_PATH = "scraped_data" PURPOSE = ( "You go to Culvers sites, you continuously seek changes on them since your last observation. " "Anything new that gets logged and dumped into csv, stored in your log folder at user/app/scraped_data." ) HISTORY = [] CURRENT_TASK = None STOP_THREADS = False # Flag to stop scraping threads # Function to monitor URLs for changes def monitor_urls(storage_location, urls, scrape_interval, content_type, selector=None): global HISTORY, STOP_THREADS previous_hashes = {url: "" for url in urls} options = Options() options.add_argument("--headless") # Run Chrome in headless mode options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") try: driver = webdriver.Chrome( service=Service(ChromeDriverManager().install()), options=options ) except Exception as e: logging.error(f"Error initializing ChromeDriver: {e}") return try: while not STOP_THREADS: for url in urls: try: driver.get(url) WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) # Wait for basic page load time.sleep(2) # Additional wait for dynamic content if content_type == "text": current_content = driver.page_source elif content_type == "media": if selector: try: elements = WebDriverWait(driver, 5).until( EC.presence_of_all_elements_located( (By.CSS_SELECTOR, selector) ) ) current_content = [ element.get_attribute("src") for element in elements ] except TimeoutException: logging.warning( f"Timeout waiting for media elements with selector '{selector}' on {url}" ) current_content = [] else: elements = driver.find_elements(By.TAG_NAME, "img") current_content = [element.get_attribute("src") for element in elements] else: current_content = driver.page_source current_hash = hashlib.md5( str(current_content).encode("utf-8") ).hexdigest() if current_hash != previous_hashes[url]: previous_hashes[url] = current_hash date_time_str = datetime.datetime.now().strftime( "%Y-%m-%d %H:%M:%S" ) HISTORY.append(f"Change detected at {url} on {date_time_str}") csv_file_path = os.path.join( storage_location, f"{urlparse(url).hostname}_changes.csv" ) os.makedirs(storage_location, exist_ok=True) file_exists = os.path.isfile(csv_file_path) with open(csv_file_path, "a", newline="", encoding="utf-8") as csvfile: fieldnames = ["date", "time", "url", "change"] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) if not file_exists: writer.writeheader() writer.writerow( { "date": date_time_str.split()[0], "time": date_time_str.split()[1], "url": url, "change": "Content changed", } ) logging.info(f"Change detected at {url} on {date_time_str}") except ( NoSuchElementException, StaleElementReferenceException, TimeoutException, Exception, ) as e: logging.error(f"Error accessing {url}: {e}") time.sleep(scrape_interval * 60) # Check every scrape_interval minutes finally: driver.quit() logging.info("ChromeDriver session ended.") # Function to start scraping def start_scraping(storage_location, urls, scrape_interval, content_type, selector=None): global CURRENT_TASK, HISTORY, STOP_THREADS if STOP_THREADS: STOP_THREADS = False # Reset the flag if previously stopped CURRENT_TASK = f"Monitoring URLs: {', '.join(urls)}" HISTORY.append(f"Task started: {CURRENT_TASK}") for url in urls: # Create a folder for the URL hostname = urlparse(url).hostname folder_path = os.path.join(storage_location, hostname) os.makedirs(folder_path, exist_ok=True) # Log the initial observation try: options = Options() options.add_argument("--headless") # Run Chrome in headless mode options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome( service=Service(ChromeDriverManager().install()), options=options ) driver.get(url) WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) # Wait for basic page load time.sleep(2) # Additional wait for dynamic content if content_type == "text": initial_content = driver.page_source elif content_type == "media": if selector: try: elements = WebDriverWait(driver, 5).until( EC.presence_of_all_elements_located( (By.CSS_SELECTOR, selector) ) ) initial_content = [ element.get_attribute("src") for element in elements ] except TimeoutException: logging.warning( f"Timeout waiting for media elements with selector '{selector}' on {url}" ) initial_content = [] else: elements = driver.find_elements(By.TAG_NAME, "img") initial_content = [element.get_attribute("src") for element in elements] else: initial_content = driver.page_source initial_hash = hashlib.md5( str(initial_content).encode("utf-8") ).hexdigest() HISTORY.append(f"Initial observation at {url}: {initial_hash}") initial_observation_path = os.path.join( folder_path, f"{hostname}_initial_observation.txt" ) with open(initial_observation_path, "w", encoding="utf-8") as file: file.write(f"Initial observation at {url}: {initial_hash}") logging.info(f"Initial observation logged for {url}") except ( NoSuchElementException, StaleElementReferenceException, TimeoutException, Exception, ) as e: HISTORY.append(f"Error accessing {url}: {e}") logging.error(f"Error accessing {url}: {e}") finally: driver.quit() # Start a new thread for monitoring URLs monitor_thread = threading.Thread( target=monitor_urls, args=(storage_location, urls, scrape_interval, content_type, selector), daemon=True, ) monitor_thread.start() logging.info("Started scraping thread.") return f"Started scraping {', '.join(urls)} every {scrape_interval} minutes." # Function to stop scraping def stop_scraping(): global STOP_THREADS STOP_THREADS = True HISTORY.append("Scraping stopped by user.") logging.info("Scraping stop signal sent.") return "Scraping has been stopped." # Function to display CSV content def display_csv(storage_location, url): hostname = urlparse(url).hostname csv_path = os.path.join(storage_location, f"{hostname}_changes.csv") if os.path.exists(csv_path): try: with open(csv_path, "r", encoding="utf-8") as file: content = file.read() return content except Exception as e: logging.error(f"Error reading CSV file for {url}: {e}") return f"Error reading CSV file for {url}: {e}" else: return "No data available." # Function to generate RSS feed for a given URL def generate_rss_feed(storage_location, url): hostname = urlparse(url).hostname csv_path = os.path.join(storage_location, f"{hostname}_changes.csv") if os.path.exists(csv_path): try: # Parse the CSV file with open(csv_path, "r", encoding="utf-8") as file: reader = csv.DictReader(file) changes = list(reader) # Create the root RSS element rss = ET.Element("rss", version="2.0") channel = ET.SubElement(rss, "channel") # Add channel elements title = ET.SubElement(channel, "title") title.text = f"RSS Feed for {hostname}" link = ET.SubElement(channel, "link") link.text = url description = ET.SubElement(channel, "description") description.text = "Recent changes detected on the website." # Add items to the feed for change in changes[-10:]: # Last 10 changes item = ET.SubElement(channel, "item") item_title = ET.SubElement(item, "title") item_title.text = f"Change detected at {change['url']}" item_link = ET.SubElement(item, "link") item_link.text = change["url"] item_description = ET.SubElement(item, "description") item_description.text = f"Content changed on {change['date']} at {change['time']}" pub_date = ET.SubElement(item, "pubDate") pub_date.text = datetime.datetime.strptime( f"{change['date']} {change['time']}", "%Y-%m-%d %H:%M:%S" ).strftime("%a, %d %b %Y %H:%M:%S +0000") # Generate the XML string rss_feed = ET.tostring(rss, encoding="utf-8") return rss_feed.decode("utf-8") except Exception as e: logging.error(f"Error generating RSS feed for {url}: {e}") return f"Error generating RSS feed for {url}: {e}" else: return "No data available." # Function to define the chat response function using the Mistral model def respond(message, history, system_message, max_tokens, temperature, top_p): # Load the model and tokenizer once if not hasattr(respond, "pipe"): try: model_name = "mistralai/Mixtral-8x7B-Instruct-v0.1" respond.tokenizer = AutoTokenizer.from_pretrained(model_name) respond.model = AutoModelForSeq2SeqLM.from_pretrained(model_name) respond.pipe = pipeline( "text-generation", model=respond.model, tokenizer=respond.tokenizer, device=0 if torch.cuda.is_available() else -1, ) logging.info("Model loaded successfully.") except Exception as e: logging.error(f"Error loading model: {e}") return "Error loading the response model." try: prompt = ( f"System: {system_message}\n" f"History: {history}\n" f"User: {message}\n" f"Assistant:" ) response = respond.pipe( prompt, max_length=max_tokens, temperature=temperature, top_p=top_p )[0]["generated_text"] return response except Exception as e: logging.error(f"Error generating response: {e}") return "Error generating response." # Define the Gradio interface def create_interface(): with gr.Blocks() as demo: gr.Markdown("# All-in-One Scraper, Database, and RSS Feeder") with gr.Row(): with gr.Column(): storage_location = gr.Textbox( value=DEFAULT_FILE_PATH, label="Storage Location" ) urls = gr.Textbox( label="URLs (comma separated)", placeholder="https://example.com, https://anotherexample.com", ) scrape_interval = gr.Slider( minimum=1, maximum=60, value=5, step=1, label="Scrape Interval (minutes)", ) content_type = gr.Radio( choices=["text", "media", "both"], value="text", label="Content Type", ) selector = gr.Textbox( label="CSS Selector for Media (Optional)", placeholder="e.g., img.main-image", ) start_button = gr.Button("Start Scraping") stop_button = gr.Button("Stop Scraping") csv_output = gr.Textbox( label="CSV Output", interactive=False, lines=2 ) with gr.Column(): chat_history = gr.Chatbot(label="Chat History") with gr.Row(): message = gr.Textbox(label="Message", placeholder="Type your message here...") system_message = gr.Textbox( value="You are a helpful assistant.", label="System message" ) max_tokens = gr.Slider( minimum=1, maximum=2048, value=512, step=1, label="Max new tokens", ) temperature = gr.Slider( minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature", ) top_p = gr.Slider( minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)", ) response_box = gr.Textbox(label="Response", interactive=False, lines=2) with gr.Row(): selected_url_csv = gr.Textbox( label="Select URL for CSV Content", placeholder="https://example.com", ) csv_button = gr.Button("Display CSV Content") csv_content_output = gr.Textbox( label="CSV Content Output", interactive=False, lines=10 ) with gr.Row(): selected_url_rss = gr.Textbox( label="Select URL for RSS Feed", placeholder="https://example.com", ) rss_button = gr.Button("Generate RSS Feed") rss_output = gr.Textbox( label="RSS Feed Output", interactive=False, lines=20 ) # Connect buttons to their respective functions start_button.click( fn=start_scraping, inputs=[ storage_location, gr.Textbox.value, scrape_interval, content_type, selector, ], outputs=csv_output, ) stop_button.click(fn=stop_scraping, outputs=csv_output) csv_button.click( fn=display_csv, inputs=[storage_location, selected_url_csv], outputs=csv_content_output, ) rss_button.click( fn=generate_rss_feed, inputs=[storage_location, selected_url_rss], outputs=rss_output, ) # Connect message submission to the chat interface def update_chat(message, history, system_message, max_tokens, temperature, top_p): response = respond(message, history, system_message, max_tokens, temperature, top_p) history.append((message, response)) return history, response message.submit( update_chat, inputs=[ message, chat_history, system_message, max_tokens, temperature, top_p, ], outputs=[chat_history, response_box], ) return demo if __name__ == "__main__": demo = create_interface() demo.launch()