CEEMEESEEK / app2.py
acecalisto3's picture
Update app2.py
2a8d7ab verified
raw
history blame
13.1 kB
import asyncio
import csv
import hashlib
import os
from typing import List, Tuple, Dict, Any, Optional
import datetime
import signal
import feedparser
import aiohttp
import gradio as gr
from huggingface_hub import InferenceClient
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import declarative_base, sessionmaker
from urllib.parse import urljoin
import logging
from sqlalchemy.orm import Session
from sqlalchemy.future import select
Base = declarative_base()
class Article(Base):
__tablename__ = 'articles'
id = Column(Integer, primary_key=True)
url = Column('url', String(2048), nullable=False, unique=True) # Increased URL length limit
title = Column('title', String(255))
content = Column('content', Text())
hash_value = Column('hash', String(32))
timestamp = Column('timestamp', DateTime(), default=datetime.datetime.utcnow)
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Configuration
HUGGINGFACE_API_KEY = os.getenv("HUGGINGFACE_API_KEY")
DEFAULT_MONITORING_INTERVAL = 300
MAX_MONITORING_INTERVAL = 600
CHANGE_FREQUENCY_THRESHOLD = 3
# Global variables
monitoring_tasks = {}
url_monitoring_intervals = {}
change_counts = {}
history = []
async def create_db_engine(db_url):
try:
engine = create_engine(db_url)
Base.metadata.create_all(engine)
return engine, sessionmaker(bind=engine)
except SQLAlchemyError as e:
logger.error(f"Database error: {e}")
raise
def sanitize_url(url: str) -> str:
return url if validators.url(url) else None
async def fetch_url_content(url: str,
session: aiohttp.ClientSession) -> Tuple[str, str]:
async with session.get(url) as response:
content = await response.text()
soup = BeautifulSoup(content, 'html.parser')
title = soup.title.string if soup.title else "No Title"
return title, content
async def save_to_database(session, url: str, title: str, content: str,
hash: str):
try:
article = Article(url=url, title=title, content=content, hash=hash)
session.add(article)
await session.commit()
except SQLAlchemyError as e:
logger.error(f"Database error: {e}")
await session.rollback()
async def save_to_csv(storage_location: str, url: str, title: str,
content: str, timestamp: datetime.datetime):
try:
os.makedirs(os.path.dirname(storage_location), exist_ok=True)
with open(storage_location, "a", newline='',
encoding="utf-8") as csvfile:
csv_writer = csv.writer(csvfile)
csv_writer.writerow([
timestamp.strftime("%Y-%m-%d %H:%M:%S"), url, title, content
])
except IOError as e:
logger.error(f"IOError saving to CSV: {e}")
except Exception as e:
logger.error(f"Unexpected error saving to CSV: {e}")
async def monitor_url(url: str, interval: int, storage_location: str,
feed_rss: bool, db_session):
previous_hash = ""
async with aiohttp.ClientSession() as session:
while True:
try:
title, content = await fetch_url_content(url, session)
current_hash = hashlib.md5(content.encode('utf-8')).hexdigest()
if current_hash != previous_hash:
previous_hash = current_hash
timestamp = datetime.datetime.now()
if feed_rss:
try:
await save_to_database(db_session, url, title,
content, current_hash)
except SQLAlchemyError as e:
logger.error(
f"Database error while saving {url}: {e}")
if storage_location:
await save_to_csv(storage_location, url, title,
content, timestamp)
history.append(
f"Change detected at {url} on {timestamp.strftime('%Y-%m-%d %H:%M:%S')}"
)
logger.info(f"Change detected at {url}")
change_counts[url] = change_counts.get(url, 0) + 1
if change_counts[url] >= CHANGE_FREQUENCY_THRESHOLD:
interval = max(60, interval // 2)
else:
change_counts[url] = 0
interval = min(interval * 2, MAX_MONITORING_INTERVAL)
url_monitoring_intervals[url] = interval
except aiohttp.ClientError as e:
logger.error(f"Network error monitoring {url}: {e}")
history.append(f"Network error monitoring {url}: {e}")
except Exception as e:
logger.error(f"Unexpected error monitoring {url}: {e}")
history.append(f"Unexpected error monitoring {url}: {e}")
await asyncio.sleep(interval)
async def start_monitoring(urls: List[str], storage_location: str,
feed_rss: bool):
global db_session
for url in urls:
if url not in monitoring_tasks:
sanitized_url = sanitize_url(url)
if sanitized_url:
task = asyncio.create_task(
monitor_url(sanitized_url, DEFAULT_MONITORING_INTERVAL,
storage_location, feed_rss, db_session))
monitoring_tasks[sanitized_url] = task
else:
logger.warning(f"Invalid URL: {url}")
history.append(f"Invalid URL: {url}")
return "Monitoring started"
async def cleanup_resources(url: str):
# Add any cleanup logic here, e.g., closing database connections
pass
def stop_monitoring(url: str):
if url in monitoring_tasks:
monitoring_tasks[url].cancel()
asyncio.create_task(cleanup_resources(url))
del monitoring_tasks[url]
return "Monitoring stopped"
def generate_rss_feed():
session = Session()
try:
articles = session.query(Article).order_by(
Article.timestamp.desc()).limit(20).all()
feed = feedparser.FeedParserDict()
feed['title'] = 'Website Changes Feed'
feed['link'] = 'http://yourwebsite.com/feed' # Replace if needed
feed['description'] = 'Feed of changes detected on monitored websites.'
feed['entries'] = [{
'title': article.title,
'link': article.url,
'description': article.content,
'published': article.timestamp
} for article in articles]
return feedparser.FeedGenerator().feed_from_dictionary(
feed).writeString('utf-8')
except SQLAlchemyError as e:
logger.error(f"Database error: {e}")
return None
finally:
session.close()
async def chatbot_response(message: str, history: List[Tuple[str, str]]):
try:
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1",
token=HUGGINGFACE_API_KEY)
response = await client.text_generation(message, max_new_tokens=100)
history.append((message, response[0]['generated_text']))
return history, history
except Exception as e:
logger.error(f"Chatbot error: {e}")
history.append((message,
"Error: Could not get a response from the chatbot."))
return history, history
async def update_db_status(db_status):
while True:
try:
await db_session.execute("SELECT 1")
db_status = "Connected"
except SQLAlchemyError:
db_status = "Disconnected"
await asyncio.sleep(60) # Check every minute
return db_status
async def update_feed_content(): # Remove db_session parameter
try:
articles = await db_session.query(Article).order_by(
Article.timestamp.desc()).limit(20).all()
feed = {
'title': 'Website Changes Feed',
'link': 'http://yourwebsite.com/feed',
'description': 'Feed of changes detected on monitored websites.',
'items': [{
'title': article.title,
'link': article.url,
'description': article.content,
'pubDate': str(article.timestamp) # Convert datetime to string
} for article in articles]
}
return feed
except SQLAlchemyError as e:
logger.error(f"Database error: {e}")
return None
async def periodic_update_with_error_handling(db_session):
while True:
try:
await asyncio.sleep(300) # Wait for 5 minutes
await update_feed_content(db_session)
except Exception as e:
logger.error(f"Error in periodic update: {e}")
async def main():
global db_session
try:
engine, Session = await create_db_engine("sqlite:///monitoring.db")
db_session = Session()
demo = gr.Blocks()
with demo:
gr.Markdown("# Website Monitor and Chatbot")
with gr.Row():
with gr.Column():
db_url = gr.Textbox(
label="Database URL", value="sqlite:///monitoring.db")
db_status = gr.Textbox(label="Database Status",
interactive=False,
value="Connected")
with gr.Column():
with gr.Tab("Configuration"):
target_urls = gr.Textbox(
label="Target URLs (comma-separated)",
placeholder=
"https://example.com, https://another-site.com")
storage_location = gr.Textbox(
label="Storage Location (CSV file path)",
placeholder="/path/to/your/file.csv")
feed_rss_checkbox = gr.Checkbox(label="Enable RSS Feed")
start_button = gr.Button("Start Monitoring")
stop_button = gr.Button("Stop Monitoring")
status_text = gr.Textbox(label="Status",
interactive=False)
history_text = gr.Textbox(label="History",
lines=10,
interactive=False)
with gr.Tab("User-End View"):
feed_content = gr.JSON(label="RSS Feed Content")
with gr.Tab("Chatbot"):
chatbot_interface = gr.Chatbot(type='messages')
message_input = gr.Textbox(
placeholder="Type your message here...")
send_button = gr.Button("Send")
async def on_start_click(target_urls_str: str, storage_loc: str,
feed_enabled: bool):
urls = [url.strip() for url in target_urls_str.split(",")]
await start_monitoring(urls,
storage_loc if storage_loc else None,
feed_enabled)
return "Monitoring started for valid URLs."
async def on_stop_click():
for url in list(monitoring_tasks.keys()):
stop_monitoring(url)
return "Monitoring stopped for all URLs."
start_button.click(
on_start_click,
inputs=[target_urls, storage_location, feed_rss_checkbox],
outputs=[status_text])
stop_button.click(on_stop_click, outputs=[status_text])
send_button.click(
chatbot_response,
inputs=[message_input, chatbot_interface],
outputs=[chatbot_interface, message_input])
# Set up the timer
feed_updater = gr.Timer(interval=300)
feed_updater.tick(fn=update_feed_content,
outputs=feed_content)
# Create background tasks
update_tasks = [
asyncio.create_task(periodic_update_with_error_handling(db_session)),
asyncio.create_task(update_db_status(db_status))
]
# Launch the demo
await demo.launch()
# Wait for background tasks to complete
for task in update_tasks:
task.cancel()
await asyncio.gather(*update_tasks, return_exceptions=True)
except Exception as e:
logger.error(f"Error in main: {e}")
finally:
if db_session:
await db_session.close()
if engine:
engine.dispose()
def signal_handler():
for task in asyncio.all_tasks():
task.cancel()
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
asyncio.run(main())