Spaces:
Runtime error
Runtime error
import asyncio | |
import csv | |
import hashlib | |
import os | |
from typing import List, Tuple, Dict, Any, Optional | |
import datetime | |
import signal | |
import feedparser | |
import aiohttp | |
import gradio as gr | |
from huggingface_hub import InferenceClient | |
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime | |
from sqlalchemy.exc import SQLAlchemyError | |
from sqlalchemy.orm import declarative_base, sessionmaker | |
from urllib.parse import urljoin | |
import logging | |
from sqlalchemy.orm import Session | |
from sqlalchemy.future import select | |
Base = declarative_base() | |
class Article(Base): | |
__tablename__ = 'articles' | |
id = Column(Integer, primary_key=True) | |
url = Column('url', String(2048), nullable=False, unique=True) # Increased URL length limit | |
title = Column('title', String(255)) | |
content = Column('content', Text()) | |
hash_value = Column('hash', String(32)) | |
timestamp = Column('timestamp', DateTime(), default=datetime.datetime.utcnow) | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Configuration | |
HUGGINGFACE_API_KEY = os.getenv("HUGGINGFACE_API_KEY") | |
DEFAULT_MONITORING_INTERVAL = 300 | |
MAX_MONITORING_INTERVAL = 600 | |
CHANGE_FREQUENCY_THRESHOLD = 3 | |
# Global variables | |
monitoring_tasks = {} | |
url_monitoring_intervals = {} | |
change_counts = {} | |
history = [] | |
async def create_db_engine(db_url): | |
try: | |
engine = create_engine(db_url) | |
Base.metadata.create_all(engine) | |
return engine, sessionmaker(bind=engine) | |
except SQLAlchemyError as e: | |
logger.error(f"Database error: {e}") | |
raise | |
def sanitize_url(url: str) -> str: | |
return url if validators.url(url) else None | |
async def fetch_url_content(url: str, | |
session: aiohttp.ClientSession) -> Tuple[str, str]: | |
async with session.get(url) as response: | |
content = await response.text() | |
soup = BeautifulSoup(content, 'html.parser') | |
title = soup.title.string if soup.title else "No Title" | |
return title, content | |
async def save_to_database(session, url: str, title: str, content: str, | |
hash: str): | |
try: | |
article = Article(url=url, title=title, content=content, hash=hash) | |
session.add(article) | |
await session.commit() | |
except SQLAlchemyError as e: | |
logger.error(f"Database error: {e}") | |
await session.rollback() | |
async def save_to_csv(storage_location: str, url: str, title: str, | |
content: str, timestamp: datetime.datetime): | |
try: | |
os.makedirs(os.path.dirname(storage_location), exist_ok=True) | |
with open(storage_location, "a", newline='', | |
encoding="utf-8") as csvfile: | |
csv_writer = csv.writer(csvfile) | |
csv_writer.writerow([ | |
timestamp.strftime("%Y-%m-%d %H:%M:%S"), url, title, content | |
]) | |
except IOError as e: | |
logger.error(f"IOError saving to CSV: {e}") | |
except Exception as e: | |
logger.error(f"Unexpected error saving to CSV: {e}") | |
async def monitor_url(url: str, interval: int, storage_location: str, | |
feed_rss: bool, db_session): | |
previous_hash = "" | |
async with aiohttp.ClientSession() as session: | |
while True: | |
try: | |
title, content = await fetch_url_content(url, session) | |
current_hash = hashlib.md5(content.encode('utf-8')).hexdigest() | |
if current_hash != previous_hash: | |
previous_hash = current_hash | |
timestamp = datetime.datetime.now() | |
if feed_rss: | |
try: | |
await save_to_database(db_session, url, title, | |
content, current_hash) | |
except SQLAlchemyError as e: | |
logger.error( | |
f"Database error while saving {url}: {e}") | |
if storage_location: | |
await save_to_csv(storage_location, url, title, | |
content, timestamp) | |
history.append( | |
f"Change detected at {url} on {timestamp.strftime('%Y-%m-%d %H:%M:%S')}" | |
) | |
logger.info(f"Change detected at {url}") | |
change_counts[url] = change_counts.get(url, 0) + 1 | |
if change_counts[url] >= CHANGE_FREQUENCY_THRESHOLD: | |
interval = max(60, interval // 2) | |
else: | |
change_counts[url] = 0 | |
interval = min(interval * 2, MAX_MONITORING_INTERVAL) | |
url_monitoring_intervals[url] = interval | |
except aiohttp.ClientError as e: | |
logger.error(f"Network error monitoring {url}: {e}") | |
history.append(f"Network error monitoring {url}: {e}") | |
except Exception as e: | |
logger.error(f"Unexpected error monitoring {url}: {e}") | |
history.append(f"Unexpected error monitoring {url}: {e}") | |
await asyncio.sleep(interval) | |
async def start_monitoring(urls: List[str], storage_location: str, | |
feed_rss: bool): | |
global db_session | |
for url in urls: | |
if url not in monitoring_tasks: | |
sanitized_url = sanitize_url(url) | |
if sanitized_url: | |
task = asyncio.create_task( | |
monitor_url(sanitized_url, DEFAULT_MONITORING_INTERVAL, | |
storage_location, feed_rss, db_session)) | |
monitoring_tasks[sanitized_url] = task | |
else: | |
logger.warning(f"Invalid URL: {url}") | |
history.append(f"Invalid URL: {url}") | |
return "Monitoring started" | |
async def cleanup_resources(url: str): | |
# Add any cleanup logic here, e.g., closing database connections | |
pass | |
def stop_monitoring(url: str): | |
if url in monitoring_tasks: | |
monitoring_tasks[url].cancel() | |
asyncio.create_task(cleanup_resources(url)) | |
del monitoring_tasks[url] | |
return "Monitoring stopped" | |
def generate_rss_feed(): | |
session = Session() | |
try: | |
articles = session.query(Article).order_by( | |
Article.timestamp.desc()).limit(20).all() | |
feed = feedparser.FeedParserDict() | |
feed['title'] = 'Website Changes Feed' | |
feed['link'] = 'http://yourwebsite.com/feed' # Replace if needed | |
feed['description'] = 'Feed of changes detected on monitored websites.' | |
feed['entries'] = [{ | |
'title': article.title, | |
'link': article.url, | |
'description': article.content, | |
'published': article.timestamp | |
} for article in articles] | |
return feedparser.FeedGenerator().feed_from_dictionary( | |
feed).writeString('utf-8') | |
except SQLAlchemyError as e: | |
logger.error(f"Database error: {e}") | |
return None | |
finally: | |
session.close() | |
async def chatbot_response(message: str, history: List[Tuple[str, str]]): | |
try: | |
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1", | |
token=HUGGINGFACE_API_KEY) | |
response = await client.text_generation(message, max_new_tokens=100) | |
history.append((message, response[0]['generated_text'])) | |
return history, history | |
except Exception as e: | |
logger.error(f"Chatbot error: {e}") | |
history.append((message, | |
"Error: Could not get a response from the chatbot.")) | |
return history, history | |
async def update_db_status(db_status): | |
while True: | |
try: | |
await db_session.execute("SELECT 1") | |
db_status = "Connected" | |
except SQLAlchemyError: | |
db_status = "Disconnected" | |
await asyncio.sleep(60) # Check every minute | |
return db_status | |
async def update_feed_content(): # Remove db_session parameter | |
try: | |
articles = await db_session.query(Article).order_by( | |
Article.timestamp.desc()).limit(20).all() | |
feed = { | |
'title': 'Website Changes Feed', | |
'link': 'http://yourwebsite.com/feed', | |
'description': 'Feed of changes detected on monitored websites.', | |
'items': [{ | |
'title': article.title, | |
'link': article.url, | |
'description': article.content, | |
'pubDate': str(article.timestamp) # Convert datetime to string | |
} for article in articles] | |
} | |
return feed | |
except SQLAlchemyError as e: | |
logger.error(f"Database error: {e}") | |
return None | |
async def periodic_update_with_error_handling(db_session): | |
while True: | |
try: | |
await asyncio.sleep(300) # Wait for 5 minutes | |
await update_feed_content(db_session) | |
except Exception as e: | |
logger.error(f"Error in periodic update: {e}") | |
async def main(): | |
global db_session | |
try: | |
engine, Session = await create_db_engine("sqlite:///monitoring.db") | |
db_session = Session() | |
demo = gr.Blocks() | |
with demo: | |
gr.Markdown("# Website Monitor and Chatbot") | |
with gr.Row(): | |
with gr.Column(): | |
db_url = gr.Textbox( | |
label="Database URL", value="sqlite:///monitoring.db") | |
db_status = gr.Textbox(label="Database Status", | |
interactive=False, | |
value="Connected") | |
with gr.Column(): | |
with gr.Tab("Configuration"): | |
target_urls = gr.Textbox( | |
label="Target URLs (comma-separated)", | |
placeholder= | |
"https://example.com, https://another-site.com") | |
storage_location = gr.Textbox( | |
label="Storage Location (CSV file path)", | |
placeholder="/path/to/your/file.csv") | |
feed_rss_checkbox = gr.Checkbox(label="Enable RSS Feed") | |
start_button = gr.Button("Start Monitoring") | |
stop_button = gr.Button("Stop Monitoring") | |
status_text = gr.Textbox(label="Status", | |
interactive=False) | |
history_text = gr.Textbox(label="History", | |
lines=10, | |
interactive=False) | |
with gr.Tab("User-End View"): | |
feed_content = gr.JSON(label="RSS Feed Content") | |
with gr.Tab("Chatbot"): | |
chatbot_interface = gr.Chatbot(type='messages') | |
message_input = gr.Textbox( | |
placeholder="Type your message here...") | |
send_button = gr.Button("Send") | |
async def on_start_click(target_urls_str: str, storage_loc: str, | |
feed_enabled: bool): | |
urls = [url.strip() for url in target_urls_str.split(",")] | |
await start_monitoring(urls, | |
storage_loc if storage_loc else None, | |
feed_enabled) | |
return "Monitoring started for valid URLs." | |
async def on_stop_click(): | |
for url in list(monitoring_tasks.keys()): | |
stop_monitoring(url) | |
return "Monitoring stopped for all URLs." | |
start_button.click( | |
on_start_click, | |
inputs=[target_urls, storage_location, feed_rss_checkbox], | |
outputs=[status_text]) | |
stop_button.click(on_stop_click, outputs=[status_text]) | |
send_button.click( | |
chatbot_response, | |
inputs=[message_input, chatbot_interface], | |
outputs=[chatbot_interface, message_input]) | |
# Set up the timer | |
feed_updater = gr.Timer(interval=300) | |
feed_updater.tick(fn=update_feed_content, | |
outputs=feed_content) | |
# Create background tasks | |
update_tasks = [ | |
asyncio.create_task(periodic_update_with_error_handling(db_session)), | |
asyncio.create_task(update_db_status(db_status)) | |
] | |
# Launch the demo | |
await demo.launch() | |
# Wait for background tasks to complete | |
for task in update_tasks: | |
task.cancel() | |
await asyncio.gather(*update_tasks, return_exceptions=True) | |
except Exception as e: | |
logger.error(f"Error in main: {e}") | |
finally: | |
if db_session: | |
await db_session.close() | |
if engine: | |
engine.dispose() | |
def signal_handler(): | |
for task in asyncio.all_tasks(): | |
task.cancel() | |
if __name__ == "__main__": | |
signal.signal(signal.SIGINT, signal_handler) | |
signal.signal(signal.SIGTERM, signal_handler) | |
asyncio.run(main()) | |