Spaces:
Runtime error
Runtime error
import asyncio | |
import gradio as gr | |
import logging | |
import os | |
import sys | |
import requests | |
from bs4 import BeautifulSoup | |
from datetime import datetime | |
from pytz import timezone | |
from typing import List, Dict, Any | |
from sqlalchemy.exc import SQLAlchemyError | |
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession | |
from sqlalchemy.future import select | |
from sqlalchemy.orm import sessionmaker | |
from dotenv import load_dotenv | |
from pydantic_settings import BaseSettings | |
# --- Configuration --- | |
load_dotenv() # Load environment variables from .env file | |
class Settings(BaseSettings): | |
# Define the database fields explicitly | |
DB_HOST: str | |
DB_USER: str | |
DB_PASSWORD: str | |
DB_NAME: str | |
database_type: str | |
database_port: int | |
chatbot_api_key: str | |
rss_feed_url: str | |
storage_location: str | |
class Config: | |
# Optional: if you want to allow extra fields | |
extra = "allow" | |
settings = Settings() | |
# --- Database Connection --- | |
def get_db_url(settings: Settings) -> str: | |
if settings.database_type == "mysql": | |
return f"mysql+aiomysql://{settings.db_user}:{settings.db_password}@{settings.db_host}:{settings.database_port}/{settings.db_name}" | |
elif settings.database_type == "postgresql": | |
return f"postgresql+asyncpg://{settings.db_user}:{settings.db_password}@{settings.db_host}:{settings.database_port}/{settings.db_name}" | |
else: | |
return "sqlite+aiosqlite:///default.db" | |
async def set_db_connection( | |
db_type: str = None, | |
db_host: str = None, | |
db_port: int = None, | |
db_user: str = None, | |
db_password: str = None, | |
db_name: str = None | |
): | |
global db_session, engine, settings | |
try: | |
# Update settings if new values provided | |
if db_type: | |
settings.database_type = db_type | |
if db_host: | |
settings.db_host = db_host | |
if db_port: | |
settings.database_port = db_port | |
if db_user: | |
settings.db_user = db_user | |
if db_password: | |
settings.db_password = db_password | |
if db_name: | |
settings.db_name = db_name | |
# Close existing connection if any | |
if db_session: | |
await db_session.close() | |
if engine: | |
await engine.dispose() | |
# Create new connection | |
db_url = get_db_url(settings) | |
engine = create_async_engine(db_url, echo=False) | |
async_session_maker = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) | |
db_session = async_session_maker() | |
logger.info("Database connection established.") | |
return "Database connection established." | |
except Exception as e: | |
logger.error(f"Failed to establish database connection: {e}") | |
return f"Failed to connect to database: {e}" | |
# --- Database Model (Example) --- | |
class Article: | |
def __init__(self, title, url, content, timestamp): | |
self.title = title | |
self.url = url | |
self.content = content | |
self.timestamp = timestamp | |
# --- Global Variables --- | |
db_session = None | |
engine = None | |
monitoring_task = None | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.DEBUG) | |
# --- Database Connection --- | |
async def set_db_connection(): | |
global db_session, engine | |
try: | |
engine = create_async_engine( | |
f"mysql+aiomysql://{settings.db_user}:{settings.db_password}@{settings.db_host}:{settings.database_port}/{settings.db_name}", | |
echo=False, | |
) | |
async_session_maker = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) | |
db_session = async_session_maker() | |
logger.info("Database connection established.") | |
return "Database connection established." | |
except Exception as e: | |
logger.error(f"Failed to establish database connection: {e}") | |
return f"Failed to connect to database: {e}" | |
# --- Website Monitoring --- | |
async def start_monitoring(urls: List[str], storage_loc: str, feed_enabled: bool): | |
while True: | |
for url in urls: | |
try: | |
content = await scrape_website(url) | |
analysis = await analyze_website_content(content) | |
await store_data(url, content, analysis) | |
if feed_enabled: | |
await update_feed_content() | |
except Exception as e: | |
logger.error(f"Error monitoring website {url}: {e}") | |
await asyncio.sleep(300) # Check every 5 minutes | |
async def store_data(url: str, content: str, analysis: Dict[str, Any]): | |
try: | |
async with db_session as session: | |
article = Article( | |
title=f"Change on {url}", | |
url=url, | |
content=content, | |
timestamp=datetime.now(timezone("UTC")), | |
) | |
session.add(article) | |
await session.commit() | |
except Exception as e: | |
logger.error(f"Error storing data: {e}") | |
async def update_feed_content(): | |
try: | |
# Fetch RSS feed content from database | |
async with db_session as session: | |
articles = await session.execute(select(Article)) | |
feed_content = [] | |
for article in articles.scalars(): | |
feed_content.append({ | |
"title": article.title, | |
"url": article.url, | |
"content": article.content, | |
"timestamp": article.timestamp, | |
}) | |
return feed_content | |
except Exception as e: | |
logger.error(f"Error updating feed content: {e}") | |
return [] | |
# --- Website Scraping and Analysis --- | |
async def scrape_website(url: str) -> str: | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, "html.parser") | |
return soup.get_text() | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error scraping website {url}: {e}") | |
return "" | |
async def analyze_website_content(content: str) -> Dict[str, Any]: | |
try: | |
# Perform sentiment analysis or other analysis | |
sentiment = "Positive" if content.count("good") > content.count("bad") else "Negative" | |
return {"sentiment": sentiment} | |
except Exception as e: | |
logger.error(f"Error analyzing website content: {e}") | |
return {} | |
# --- Website Traffic Prediction --- | |
async def predict_website_traffic(url: str) -> Dict[str, Any]: | |
try: | |
# ... (Your machine learning model for traffic prediction) ... | |
return {"traffic": 100} # Placeholder | |
except Exception as e: | |
logger.error(f"Error predicting website traffic: {e}") | |
return {} | |
# --- Chatbot Integration --- | |
async def chatbot_response(message: str, chat_history: List[Dict[str, str]]) -> List[Dict[str, str]]: | |
try: | |
if not settings.chatbot_api_key: | |
raise ValueError("Chatbot API key is not set.") | |
# ... (Your code to call the chatbot API) ... | |
# Example using a hypothetical API: | |
response = requests.post( | |
"https://your-chatbot-api.com/chat", | |
json={"message": message, "api_key": settings.chatbot_api_key}, | |
) | |
response.raise_for_status() | |
bot_response = response.json()["response"] | |
chat_history.append({"role": "user", "content": message}) | |
chat_history.append({"role": "bot", "content": bot_response}) | |
return chat_history, "" | |
except Exception as e: | |
logger.error(f"Error calling chatbot API: {e}") | |
chat_history.append({"role": "bot", "content": "Sorry, I'm having trouble responding right now."}) | |
return chat_history, "" | |
# --- Database Status --- | |
async def update_db_status(): | |
global db_session, engine | |
if db_session and engine: | |
try: | |
await db_session.execute(select(1)) | |
return "Database connection is active." | |
except SQLAlchemyError as e: | |
return f"Database error: {e}" | |
else: | |
return "Database connection not established." | |
# --- Gradio UI --- | |
async def main(): | |
global db_session, monitoring_task | |
demo = gr.Blocks() | |
with demo: | |
gr.Markdown("# Website Monitor and Chatbot") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("## Database Settings") | |
db_status_textbox = gr.Textbox(label="Database Status", interactive=False) | |
status_text = gr.Textbox(label="Status", interactive=False) | |
gr.Markdown("## RSS Feed Reader Settings") | |
view_button = gr.Button("View Feed") | |
target_urls = gr.Textbox(label="Target URLs (comma-separated)", placeholder="https://example.com, https://another-site.com") | |
feed_rss_checkbox = gr.Checkbox(label="Enable RSS Feed") | |
start_button = gr.Button("Start Monitoring") | |
stop_button = gr.Button("Stop Monitoring") | |
with gr.Column(): | |
feed_content = gr.JSON(label="RSS Feed Content") | |
chatbot_interface = gr.Chatbot(type="messages") | |
message_input = gr.Textbox(placeholder="Type your message here...") | |
send_button = gr.Button("Send") | |
scrape_button = gr.Button("Scrape Website") | |
analyze_button = gr.Button("Analyze Website Content") | |
predict_button = gr.Button("Predict Website Traffic") | |
scrape_output = gr.Textbox(label="Scraped Website Content", interactive=False) | |
analyze_output = gr.JSON(label="Website Content Analysis") | |
predict_output = gr.JSON(label="Website Traffic Prediction") | |
# --- Button Actions --- | |
start_button.click( | |
fn=on_start_click, | |
inputs=[target_urls, feed_rss_checkbox], | |
outputs=[status_text], | |
) | |
stop_button.click(fn=on_stop_click, outputs=[status_text]) | |
view_button.click( | |
fn=on_view_feed_click, | |
inputs=[], | |
outputs=[feed_content], | |
) | |
scrape_button.click( | |
fn=on_scrape_click, | |
inputs=[target_urls], | |
outputs=[scrape_output], | |
) | |
analyze_button.click( | |
fn=on_analyze_click, | |
inputs=[scrape_output], | |
outputs=[analyze_output], | |
) | |
predict_button.click( | |
fn=on_predict_click, | |
inputs=[target_urls], | |
outputs=[predict_output], | |
) | |
send_button.click( | |
fn=chatbot_response, | |
inputs=[message_input, chatbot_interface], | |
outputs=[chatbot_interface, message_input], | |
) | |
# --- Periodic Updates --- | |
async def update_feed_periodically(feed_content): | |
while True: | |
await update_feed_content() | |
await asyncio.sleep(300) # Check every 5 minutes | |
feed_updater = asyncio.create_task(update_feed_periodically(feed_content)) | |
# --- Load Database Status --- | |
demo.load(fn=update_db_status, outputs=[db_status_textbox]) | |
# --- Launch Gradio --- | |
await demo.launch() | |
# --- Helper Functions --- | |
async def on_start_click(target_urls_str: str, feed_enabled: bool): | |
global monitoring_task | |
urls = [url.strip() for url in target_urls_str.split(",")] | |
await set_db_connection() | |
monitoring_task = asyncio.create_task(start_monitoring(urls, settings.storage_location, feed_enabled)) | |
return "Monitoring started." | |
async def on_stop_click(): | |
global monitoring_task | |
if monitoring_task: | |
monitoring_task.cancel() | |
monitoring_task = None | |
return "Monitoring stopped." | |
async def on_view_feed_click(): | |
return await update_feed_content() | |
async def on_scrape_click(url: str): | |
return await scrape_website(url) | |
async def on_analyze_click(content: str): | |
return await analyze_website_content(content) | |
async def on_predict_click(url: str): | |
return await predict_website_traffic(url) | |
# --- Main Execution --- | |
if __name__ == "__main__": | |
asyncio.run(main()) |