Spaces:
Runtime error
Runtime error
import os | |
import time | |
import hashlib | |
import logging | |
import datetime | |
import csv | |
import threading | |
from urllib.parse import urlparse | |
from selenium import webdriver | |
from selenium.webdriver.chrome.service import Service | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.common.exceptions import ( | |
TimeoutException, | |
NoSuchElementException, | |
StaleElementReferenceException, | |
) | |
from webdriver_manager.chrome import ChromeDriverManager # Added import | |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline | |
import feedparser | |
import gradio as gr | |
import xml.etree.ElementTree as ET | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
) | |
# Define constants | |
DEFAULT_FILE_PATH = "scraped_data" | |
PURPOSE = ( | |
"You go to Culvers sites, you continuously seek changes on them since your last observation. " | |
"Anything new that gets logged and dumped into csv, stored in your log folder at user/app/scraped_data." | |
) | |
HISTORY = [] | |
CURRENT_TASK = None | |
STOP_THREADS = False # Flag to stop scraping threads | |
# Function to monitor URLs for changes | |
def monitor_urls(storage_location, urls, scrape_interval, content_type, selector=None): | |
global HISTORY, STOP_THREADS | |
previous_hashes = {url: "" for url in urls} | |
options = Options() | |
options.add_argument("--headless") # Run Chrome in headless mode | |
options.add_argument("--no-sandbox") | |
options.add_argument("--disable-dev-shm-usage") | |
try: | |
driver = webdriver.Chrome( | |
service=Service(ChromeDriverManager().install()), options=options | |
) | |
except Exception as e: | |
logging.error(f"Error initializing ChromeDriver: {e}") | |
return | |
try: | |
while not STOP_THREADS: | |
for url in urls: | |
try: | |
driver.get(url) | |
WebDriverWait(driver, 10).until( | |
EC.presence_of_element_located((By.TAG_NAME, "body")) | |
) # Wait for basic page load | |
time.sleep(2) # Additional wait for dynamic content | |
if content_type == "text": | |
current_content = driver.page_source | |
elif content_type == "media": | |
if selector: | |
try: | |
elements = WebDriverWait(driver, 5).until( | |
EC.presence_of_all_elements_located( | |
(By.CSS_SELECTOR, selector) | |
) | |
) | |
current_content = [ | |
element.get_attribute("src") for element in elements | |
] | |
except TimeoutException: | |
logging.warning( | |
f"Timeout waiting for media elements with selector '{selector}' on {url}" | |
) | |
current_content = [] | |
else: | |
elements = driver.find_elements(By.TAG_NAME, "img") | |
current_content = [element.get_attribute("src") for element in elements] | |
else: | |
current_content = driver.page_source | |
current_hash = hashlib.md5( | |
str(current_content).encode("utf-8") | |
).hexdigest() | |
if current_hash != previous_hashes[url]: | |
previous_hashes[url] = current_hash | |
date_time_str = datetime.datetime.now().strftime( | |
"%Y-%m-%d %H:%M:%S" | |
) | |
HISTORY.append(f"Change detected at {url} on {date_time_str}") | |
csv_file_path = os.path.join( | |
storage_location, f"{urlparse(url).hostname}_changes.csv" | |
) | |
os.makedirs(storage_location, exist_ok=True) | |
file_exists = os.path.isfile(csv_file_path) | |
with open(csv_file_path, "a", newline="", encoding="utf-8") as csvfile: | |
fieldnames = ["date", "time", "url", "change"] | |
writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
if not file_exists: | |
writer.writeheader() | |
writer.writerow( | |
{ | |
"date": date_time_str.split()[0], | |
"time": date_time_str.split()[1], | |
"url": url, | |
"change": "Content changed", | |
} | |
) | |
logging.info(f"Change detected at {url} on {date_time_str}") | |
except ( | |
NoSuchElementException, | |
StaleElementReferenceException, | |
TimeoutException, | |
Exception, | |
) as e: | |
logging.error(f"Error accessing {url}: {e}") | |
time.sleep(scrape_interval * 60) # Check every scrape_interval minutes | |
finally: | |
driver.quit() | |
logging.info("ChromeDriver session ended.") | |
# Function to start scraping | |
def start_scraping(storage_location, urls, scrape_interval, content_type, selector=None): | |
global CURRENT_TASK, HISTORY, STOP_THREADS | |
if STOP_THREADS: | |
STOP_THREADS = False # Reset the flag if previously stopped | |
CURRENT_TASK = f"Monitoring URLs: {', '.join(urls)}" | |
HISTORY.append(f"Task started: {CURRENT_TASK}") | |
for url in urls: | |
# Create a folder for the URL | |
hostname = urlparse(url).hostname | |
folder_path = os.path.join(storage_location, hostname) | |
os.makedirs(folder_path, exist_ok=True) | |
# Log the initial observation | |
try: | |
options = Options() | |
options.add_argument("--headless") # Run Chrome in headless mode | |
options.add_argument("--no-sandbox") | |
options.add_argument("--disable-dev-shm-usage") | |
driver = webdriver.Chrome( | |
service=Service(ChromeDriverManager().install()), options=options | |
) | |
driver.get(url) | |
WebDriverWait(driver, 10).until( | |
EC.presence_of_element_located((By.TAG_NAME, "body")) | |
) # Wait for basic page load | |
time.sleep(2) # Additional wait for dynamic content | |
if content_type == "text": | |
initial_content = driver.page_source | |
elif content_type == "media": | |
if selector: | |
try: | |
elements = WebDriverWait(driver, 5).until( | |
EC.presence_of_all_elements_located( | |
(By.CSS_SELECTOR, selector) | |
) | |
) | |
initial_content = [ | |
element.get_attribute("src") for element in elements | |
] | |
except TimeoutException: | |
logging.warning( | |
f"Timeout waiting for media elements with selector '{selector}' on {url}" | |
) | |
initial_content = [] | |
else: | |
elements = driver.find_elements(By.TAG_NAME, "img") | |
initial_content = [element.get_attribute("src") for element in elements] | |
else: | |
initial_content = driver.page_source | |
initial_hash = hashlib.md5( | |
str(initial_content).encode("utf-8") | |
).hexdigest() | |
HISTORY.append(f"Initial observation at {url}: {initial_hash}") | |
initial_observation_path = os.path.join( | |
folder_path, f"{hostname}_initial_observation.txt" | |
) | |
with open(initial_observation_path, "w", encoding="utf-8") as file: | |
file.write(f"Initial observation at {url}: {initial_hash}") | |
logging.info(f"Initial observation logged for {url}") | |
except ( | |
NoSuchElementException, | |
StaleElementReferenceException, | |
TimeoutException, | |
Exception, | |
) as e: | |
HISTORY.append(f"Error accessing {url}: {e}") | |
logging.error(f"Error accessing {url}: {e}") | |
finally: | |
driver.quit() | |
# Start a new thread for monitoring URLs | |
monitor_thread = threading.Thread( | |
target=monitor_urls, | |
args=(storage_location, urls, scrape_interval, content_type, selector), | |
daemon=True, | |
) | |
monitor_thread.start() | |
logging.info("Started scraping thread.") | |
return f"Started scraping {', '.join(urls)} every {scrape_interval} minutes." | |
# Function to stop scraping | |
def stop_scraping(): | |
global STOP_THREADS | |
STOP_THREADS = True | |
HISTORY.append("Scraping stopped by user.") | |
logging.info("Scraping stop signal sent.") | |
return "Scraping has been stopped." | |
# Function to display CSV content | |
def display_csv(storage_location, url): | |
hostname = urlparse(url).hostname | |
csv_path = os.path.join(storage_location, f"{hostname}_changes.csv") | |
if os.path.exists(csv_path): | |
try: | |
with open(csv_path, "r", encoding="utf-8") as file: | |
content = file.read() | |
return content | |
except Exception as e: | |
logging.error(f"Error reading CSV file for {url}: {e}") | |
return f"Error reading CSV file for {url}: {e}" | |
else: | |
return "No data available." | |
# Function to generate RSS feed for a given URL | |
def generate_rss_feed(storage_location, url): | |
hostname = urlparse(url).hostname | |
csv_path = os.path.join(storage_location, f"{hostname}_changes.csv") | |
if os.path.exists(csv_path): | |
try: | |
# Parse the CSV file | |
with open(csv_path, "r", encoding="utf-8") as file: | |
reader = csv.DictReader(file) | |
changes = list(reader) | |
# Create the root RSS element | |
rss = ET.Element("rss", version="2.0") | |
channel = ET.SubElement(rss, "channel") | |
# Add channel elements | |
title = ET.SubElement(channel, "title") | |
title.text = f"RSS Feed for {hostname}" | |
link = ET.SubElement(channel, "link") | |
link.text = url | |
description = ET.SubElement(channel, "description") | |
description.text = "Recent changes detected on the website." | |
# Add items to the feed | |
for change in changes[-10:]: # Last 10 changes | |
item = ET.SubElement(channel, "item") | |
item_title = ET.SubElement(item, "title") | |
item_title.text = f"Change detected at {change['url']}" | |
item_link = ET.SubElement(item, "link") | |
item_link.text = change["url"] | |
item_description = ET.SubElement(item, "description") | |
item_description.text = f"Content changed on {change['date']} at {change['time']}" | |
pub_date = ET.SubElement(item, "pubDate") | |
pub_date.text = datetime.datetime.strptime( | |
f"{change['date']} {change['time']}", "%Y-%m-%d %H:%M:%S" | |
).strftime("%a, %d %b %Y %H:%M:%S +0000") | |
# Generate the XML string | |
rss_feed = ET.tostring(rss, encoding="utf-8") | |
return rss_feed.decode("utf-8") | |
except Exception as e: | |
logging.error(f"Error generating RSS feed for {url}: {e}") | |
return f"Error generating RSS feed for {url}: {e}" | |
else: | |
return "No data available." | |
# Function to define the chat response function using the Mistral model | |
def respond(message, history, system_message, max_tokens, temperature, top_p): | |
# Load the model and tokenizer once | |
if not hasattr(respond, "pipe"): | |
try: | |
model_name = "mistralai/Mixtral-8x7B-Instruct-v0.1" | |
respond.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
respond.model = AutoModelForSeq2SeqLM.from_pretrained(model_name) | |
respond.pipe = pipeline( | |
"text-generation", | |
model=respond.model, | |
tokenizer=respond.tokenizer, | |
device=0 if torch.cuda.is_available() else -1, | |
) | |
logging.info("Model loaded successfully.") | |
except Exception as e: | |
logging.error(f"Error loading model: {e}") | |
return "Error loading the response model." | |
try: | |
prompt = ( | |
f"System: {system_message}\n" | |
f"History: {history}\n" | |
f"User: {message}\n" | |
f"Assistant:" | |
) | |
response = respond.pipe( | |
prompt, max_length=max_tokens, temperature=temperature, top_p=top_p | |
)[0]["generated_text"] | |
return response | |
except Exception as e: | |
logging.error(f"Error generating response: {e}") | |
return "Error generating response." | |
# Define the Gradio interface | |
def create_interface(): | |
with gr.Blocks() as demo: | |
gr.Markdown("# All-in-One Scraper, Database, and RSS Feeder") | |
with gr.Row(): | |
with gr.Column(): | |
storage_location = gr.Textbox( | |
value=DEFAULT_FILE_PATH, label="Storage Location" | |
) | |
urls = gr.Textbox( | |
label="URLs (comma separated)", | |
placeholder="https://example.com, https://anotherexample.com", | |
) | |
scrape_interval = gr.Slider( | |
minimum=1, | |
maximum=60, | |
value=5, | |
step=1, | |
label="Scrape Interval (minutes)", | |
) | |
content_type = gr.Radio( | |
choices=["text", "media", "both"], | |
value="text", | |
label="Content Type", | |
) | |
selector = gr.Textbox( | |
label="CSS Selector for Media (Optional)", | |
placeholder="e.g., img.main-image", | |
) | |
start_button = gr.Button("Start Scraping") | |
stop_button = gr.Button("Stop Scraping") | |
csv_output = gr.Textbox( | |
label="CSV Output", interactive=False, lines=2 | |
) | |
with gr.Column(): | |
chat_history = gr.Chatbot(label="Chat History") | |
with gr.Row(): | |
message = gr.Textbox(label="Message", placeholder="Type your message here...") | |
system_message = gr.Textbox( | |
value="You are a helpful assistant.", label="System message" | |
) | |
max_tokens = gr.Slider( | |
minimum=1, | |
maximum=2048, | |
value=512, | |
step=1, | |
label="Max new tokens", | |
) | |
temperature = gr.Slider( | |
minimum=0.1, | |
maximum=4.0, | |
value=0.7, | |
step=0.1, | |
label="Temperature", | |
) | |
top_p = gr.Slider( | |
minimum=0.1, | |
maximum=1.0, | |
value=0.95, | |
step=0.05, | |
label="Top-p (nucleus sampling)", | |
) | |
response_box = gr.Textbox(label="Response", interactive=False, lines=2) | |
with gr.Row(): | |
selected_url_csv = gr.Textbox( | |
label="Select URL for CSV Content", | |
placeholder="https://example.com", | |
) | |
csv_button = gr.Button("Display CSV Content") | |
csv_content_output = gr.Textbox( | |
label="CSV Content Output", interactive=False, lines=10 | |
) | |
with gr.Row(): | |
selected_url_rss = gr.Textbox( | |
label="Select URL for RSS Feed", | |
placeholder="https://example.com", | |
) | |
rss_button = gr.Button("Generate RSS Feed") | |
rss_output = gr.Textbox( | |
label="RSS Feed Output", interactive=False, lines=20 | |
) | |
# Connect buttons to their respective functions | |
start_button.click( | |
fn=start_scraping, | |
inputs=[ | |
storage_location, | |
gr.Textbox.value, | |
scrape_interval, | |
content_type, | |
selector, | |
], | |
outputs=csv_output, | |
) | |
stop_button.click(fn=stop_scraping, outputs=csv_output) | |
csv_button.click( | |
fn=display_csv, | |
inputs=[storage_location, selected_url_csv], | |
outputs=csv_content_output, | |
) | |
rss_button.click( | |
fn=generate_rss_feed, | |
inputs=[storage_location, selected_url_rss], | |
outputs=rss_output, | |
) | |
# Connect message submission to the chat interface | |
def update_chat(message, history, system_message, max_tokens, temperature, top_p): | |
response = respond(message, history, system_message, max_tokens, temperature, top_p) | |
history.append((message, response)) | |
return history, response | |
message.submit( | |
update_chat, | |
inputs=[ | |
message, | |
chat_history, | |
system_message, | |
max_tokens, | |
temperature, | |
top_p, | |
], | |
outputs=[chat_history, response_box], | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = create_interface() | |
demo.launch() |