Spaces:
Runtime error
Runtime error
import csv | |
import logging | |
import os | |
from typing import List, Tuple | |
import asyncio | |
import datetime | |
import hashlib | |
import aiohttp | |
import feedparser | |
import gradio as gr | |
from huggingface_hub import InferenceClient | |
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime | |
from sqlalchemy.orm import declarative_base, sessionmaker | |
from sqlalchemy.exc import SQLAlchemyError | |
import validators | |
from bs4 import BeautifulSoup | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Configuration | |
HUGGINGFACE_API_KEY = os.getenv("HUGGINGFACE_API_KEY") | |
DEFAULT_MONITORING_INTERVAL = 300 | |
MAX_MONITORING_INTERVAL = 600 | |
CHANGE_FREQUENCY_THRESHOLD = 3 | |
# Global variables | |
monitoring_tasks = {} | |
url_monitoring_intervals = {} | |
change_counts = {} | |
history = [] | |
# Database setup | |
Base = declarative_base() | |
class Article(Base): | |
__tablename__ = 'articles' | |
id = Column(Integer, primary_key=True) | |
url = Column(String(255), nullable=False) | |
title = Column(String(255)) | |
content = Column(Text) | |
hash = Column(String(32)) | |
timestamp = Column(DateTime, default=datetime.datetime.utcnow) | |
async def create_db_engine(db_url): | |
try: | |
engine = create_engine(db_url) | |
Base.metadata.create_all(engine) | |
return engine, sessionmaker(bind=engine) | |
except SQLAlchemyError as e: | |
logger.error(f"Database error: {e}") | |
raise | |
def sanitize_url(url: str) -> str: | |
return url if validators.url(url) else None | |
async def fetch_url_content(url: str, session: aiohttp.ClientSession) -> Tuple[str, str]: | |
async with session.get(url) as response: | |
content = await response.text() | |
soup = BeautifulSoup(content, 'html.parser') | |
title = soup.title.string if soup.title else "No Title" | |
return title, content | |
async def save_to_database(session, url: str, title: str, content: str, hash: str): | |
try: | |
article = Article(url=url, title=title, content=content, hash=hash) | |
session.add(article) | |
await session.commit() | |
except SQLAlchemyError as e: | |
logger.error(f"Database error: {e}") | |
await session.rollback() | |
async def save_to_csv(storage_location: str, url: str, title: str, content: str, timestamp: datetime.datetime): | |
try: | |
os.makedirs(os.path.dirname(storage_location), exist_ok=True) | |
with open(storage_location, "a", newline='', encoding="utf-8") as csvfile: | |
csv_writer = csv.writer(csvfile) | |
csv_writer.writerow([timestamp.strftime("%Y-%m-%d %H:%M:%S"), url, title, content]) | |
except IOError as e: | |
logger.error(f"IOError saving to CSV: {e}") | |
except Exception as e: | |
logger.error(f"Unexpected error saving to CSV: {e}") | |
async def monitor_url(url: str, interval: int, storage_location: str, feed_rss: bool, db_session): | |
previous_hash = "" | |
async with aiohttp.ClientSession() as session: | |
while True: | |
try: | |
title, content = await fetch_url_content(url, session) | |
current_hash = hashlib.md5(content.encode('utf-8')).hexdigest() | |
if current_hash != previous_hash: | |
previous_hash = current_hash | |
timestamp = datetime.datetime.now() | |
if feed_rss: | |
try: | |
await save_to_database(db_session, url, title, content, current_hash) | |
except SQLAlchemyError as e: | |
logger.error(f"Database error while saving {url}: {e}") | |
if storage_location: | |
await save_to_csv(storage_location, url, title, content, timestamp) | |
history.append(f"Change detected at {url} on {timestamp.strftime('%Y-%m-%d %H:%M:%S')}") | |
logger.info(f"Change detected at {url}") | |
change_counts[url] = change_counts.get(url, 0) + 1 | |
if change_counts[url] >= CHANGE_FREQUENCY_THRESHOLD: | |
interval = max(60, interval // 2) | |
else: | |
change_counts[url] = 0 | |
interval = min(interval * 2, MAX_MONITORING_INTERVAL) | |
url_monitoring_intervals[url] = interval | |
except aiohttp.ClientError as e: | |
logger.error(f"Network error monitoring {url}: {e}") | |
history.append(f"Network error monitoring {url}: {e}") | |
except Exception as e: | |
logger.error(f"Unexpected error monitoring {url}: {e}") | |
history.append(f"Unexpected error monitoring {url}: {e}") | |
await asyncio.sleep(interval) | |
async def start_monitoring(urls: List[str], storage_location: str, feed_rss: bool): | |
global db_session | |
for url in urls: | |
if url not in monitoring_tasks: | |
sanitized_url = sanitize_url(url) | |
if sanitized_url: | |
task = asyncio.create_task(monitor_url(sanitized_url, DEFAULT_MONITORING_INTERVAL, storage_location, feed_rss, db_session)) | |
monitoring_tasks[sanitized_url] = task | |
else: | |
logger.warning(f"Invalid URL: {url}") | |
history.append(f"Invalid URL: {url}") | |
return "Monitoring started" | |
async def cleanup_resources(url: str): | |
# Add any cleanup logic here, e.g., closing database connections | |
pass | |
def stop_monitoring(url: str): | |
if url in monitoring_tasks: | |
monitoring_tasks[url].cancel() | |
asyncio.create_task(cleanup_resources(url)) | |
del monitoring_tasks[url] | |
return "Monitoring stopped" | |
async def chatbot_response(message: str, history: List[Tuple[str, str]]): | |
try: | |
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1", token=HUGGINGFACE_API_KEY) | |
response = await client.text_generation(message, max_new_tokens=100) | |
history.append({"role": "user", "content": message}) | |
history.append({"role": "assistant", "content": response[0]['generated_text']}) | |
return history, history | |
except Exception as e: | |
logger.error(f"Chatbot error: {e}") | |
history.append({"role": "user", "content": message}) | |
history.append({"role": "assistant", "content": "Error: Could not get a response from the chatbot."}) | |
return history, history | |
async def update_db_status(db_status): | |
while True: | |
try: | |
await db_session.execute("SELECT 1") | |
await db_status.update(value="Connected") | |
except SQLAlchemyError: | |
await db_status.update(value="Disconnected") | |
await asyncio.sleep(60) # Check every minute | |
async def update_feed_content(db_session): | |
try: | |
articles = db_session.query(Article).order_by(Article.timestamp.desc()).limit(20).all() | |
feed = { | |
'title': 'Website Changes Feed', | |
'link': 'http://yourwebsite.com/feed', | |
'description': 'Feed of changes detected on monitored websites.', | |
'items': [{ | |
'title': article.title, | |
'link': article.url, | |
'description': article.content, | |
'pubDate': article.timestamp | |
} for article in articles] | |
} | |
return feed | |
except SQLAlchemyError as e: | |
logger.error(f"Database error: {e}") | |
return None | |
async def periodic_update_with_error_handling(db_session): | |
while True: | |
try: | |
await asyncio.sleep(300) # Wait for 5 minutes | |
await update_feed_content(db_session) | |
except Exception as e: | |
logger.error(f"Error in periodic update: {e}") | |
async def main(): | |
global db_session | |
try: | |
engine, Session = await create_db_engine("sqlite:///monitoring.db") | |
db_session = Session() | |
except SQLAlchemyError as e: | |
logger.error(f"Failed to connect to database: {e}") | |
return | |
demo = gr.Blocks() | |
with demo: | |
gr.Markdown("# Website Monitor and Chatbot") | |
with gr.Row(): | |
with gr.Column(): | |
db_url = gr.Textbox(label="Database URL", value="sqlite:///monitoring.db") | |
db_status = gr.Textbox(label="Database Status", interactive=False, value="Connected") | |
with gr.Column(): | |
with gr.Tab("Configuration"): | |
target_urls = gr.Textbox(label="Target URLs (comma-separated)", placeholder="https://example.com, https://another-site.com") | |
storage_location = gr.Textbox(label="Storage Location (CSV file path)", placeholder="/path/to/your/file.csv") | |
feed_rss_checkbox = gr.Checkbox(label="Enable RSS Feed") | |
start_button = gr.Button("Start Monitoring") | |
stop_button = gr.Button("Stop Monitoring") | |
status_text = gr.Textbox(label="Status", interactive=False) | |
history_text = gr.Textbox(label="History", lines=10, interactive=False) | |
with gr.Tab("User-End View"): | |
feed_content = gr.JSON(label="RSS Feed Content") | |
with gr.Tab("Chatbot"): | |
chatbot_interface = gr.Chatbot() | |
message_input = gr.Textbox(placeholder="Type your message here...") | |
send_button = gr.Button("Send") | |
start_button.click( | |
start_monitoring, | |
inputs=[target_urls, storage_location, feed_rss_checkbox], | |
outputs=status_text | |
) | |
stop_button.click( | |
lambda url: stop_monitoring(url), | |
inputs=target_urls, | |
outputs=status_text | |
) | |
send_button.click( | |
chatbot_response, | |
inputs=[message_input, chatbot_interface], | |
outputs=[chatbot_interface, message_input] | |
) | |
asyncio.create_task(periodic_update_with_error_handling(db_session)) | |
asyncio.create_task(update_db_status(db_status)) | |
try: | |
await demo.launch() | |
finally: | |
if db_session: | |
await db_session.close() | |
engine.dispose() | |
if __name__ == "__main__": | |
asyncio.run(main()) | |