import os import json import asyncio <<<<<<< HEAD from fastapi import FastAPI, HTTPException, BackgroundTasks ======= import boto3 from fastapi import FastAPI, HTTPException, BackgroundTasks, WebSocket >>>>>>> 10d35ff (RemoteAdded) from pydantic import BaseModel from typing import Dict, List, Optional import aiohttp from bs4 import BeautifulSoup from fake_useragent import UserAgent import logging <<<<<<< HEAD from datetime import datetime from supabase import create_client, Client import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart ======= import psutil import platform from datetime import datetime from openai import AsyncOpenAI from supabase import create_client, Client from prometheus_client import Counter, Gauge, generate_latest import websockets # Version information WORKER_VERSION = "1.0.1" >>>>>>> 10d35ff (RemoteAdded) # Configure logging with more details logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler() ] ) logger = logging.getLogger("huggingface_automation") <<<<<<< HEAD # Initialize FastAPI app = FastAPI( title="AutoclientAI Search/Email Service", description="Distributed automation system for search and email processes", version="1.0.0" ======= # Prometheus metrics EMAILS_SENT = Counter('emails_sent_total', 'Total number of emails sent') CURRENT_BATCH_SIZE = Gauge('current_batch_size', 'Current batch processing size') CPU_USAGE = Gauge('cpu_usage_percent', 'Current CPU usage percentage') MEMORY_USAGE = Gauge('memory_usage_percent', 'Current memory usage percentage') PROCESS_TIME = Gauge('process_time_seconds', 'Time taken to process each batch') # Initialize FastAPI with API prefix app = FastAPI( title="AutoclientAI Search/Email Service", description="Distributed automation system for search and email processes", version=WORKER_VERSION, root_path="/api" # Add API prefix >>>>>>> 10d35ff (RemoteAdded) ) # Initialize Supabase client with error handling try: supabase_url = os.environ.get("SUPABASE_URL") supabase_key = os.environ.get("SUPABASE_KEY") <<<<<<< HEAD ======= cloudflare_worker_url = os.environ.get("CLOUDFLARE_WORKER_URL", "https://autoclient-worker.trigox.workers.dev/api") >>>>>>> 10d35ff (RemoteAdded) if not supabase_url or not supabase_key: raise ValueError("Missing required Supabase environment variables") supabase = create_client(supabase_url, supabase_key) logger.info("Supabase client initialized successfully") except Exception as e: logger.error(f"Failed to initialize Supabase client: {str(e)}") raise <<<<<<< HEAD ======= # Initialize AWS SES client try: aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID') aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY') aws_region = os.environ.get('AWS_REGION') if not all([aws_access_key_id, aws_secret_access_key, aws_region]): logger.warning("Missing AWS credentials, email functionality will be limited") ses = None else: ses = boto3.client('ses', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=aws_region ) logger.info("AWS SES client initialized successfully") except Exception as e: logger.error(f"Failed to initialize AWS SES client: {str(e)}") ses = None # Initialize OpenAI client openai_client = AsyncOpenAI(api_key=os.environ.get('OPENAI_API_KEY')) class ProcessMetrics: def __init__(self): self.start_time = datetime.now() self.process = psutil.Process() self.errors = [] self.last_error = None self.recovery_attempts = 0 self.status = "initializing" def get_metrics(self): try: cpu_percent = self.process.cpu_percent() memory_percent = self.process.memory_percent() CPU_USAGE.set(cpu_percent) MEMORY_USAGE.set(memory_percent) return { "cpu_percent": cpu_percent, "memory_percent": memory_percent, "uptime_seconds": (datetime.now() - self.start_time).total_seconds(), "thread_count": self.process.num_threads(), "status": self.status, "last_error": str(self.last_error) if self.last_error else None, "recovery_attempts": self.recovery_attempts } except Exception as e: logger.error(f"Error getting metrics: {str(e)}") return {} def log_error(self, error): self.last_error = error self.errors.append({ "timestamp": datetime.now().isoformat(), "error": str(error) }) if len(self.errors) > 100: # Keep last 100 errors self.errors.pop(0) def attempt_recovery(self): self.recovery_attempts += 1 self.status = "recovering" # Implement recovery logic here process_metrics = ProcessMetrics() >>>>>>> 10d35ff (RemoteAdded) class AutomationState: def __init__(self): self.is_running = False self.current_group = None self.current_position = 0 self.total_emails_sent = 0 self.group_emails_sent = 0 self.distribution_method = "equitable" self.loop_interval = 40 self.max_emails_per_group = 300 self.loop_automation = False <<<<<<< HEAD async def save_to_db(self): try: await supabase.table("automation_jobs").upsert({ ======= self.current_batch = [] self.processed_domains = set() self.state_version = 1 self.last_updated = datetime.now() async def save_to_db(self): try: data = { >>>>>>> 10d35ff (RemoteAdded) "status": "running" if self.is_running else "stopped", "current_group": self.current_group, "current_position": self.current_position, "total_emails_sent": self.total_emails_sent, "group_emails_sent": self.group_emails_sent, "distribution_method": self.distribution_method, "loop_interval": self.loop_interval, "max_emails_per_group": self.max_emails_per_group, "loop_automation": self.loop_automation, <<<<<<< HEAD "updated_at": datetime.now().isoformat() }).execute() except Exception as e: ======= "state_version": self.state_version, "updated_at": datetime.now().isoformat() } result = await supabase.table("automation_jobs").upsert(data).execute() self.last_updated = datetime.now() logger.info(f"State saved to database: {json.dumps(data, default=str)}") return result except Exception as e: process_metrics.log_error(e) >>>>>>> 10d35ff (RemoteAdded) logger.error(f"Failed to save automation state: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to save state: {str(e)}") automation_state = AutomationState() <<<<<<< HEAD @app.post("/start") async def start_automation(settings: Dict): """Start automation process with given settings""" try: automation_state.is_running = True automation_state.distribution_method = settings.get("distribution_method", "equitable") automation_state.loop_interval = settings.get("loop_interval", 40) automation_state.max_emails_per_group = settings.get("max_emails_per_group", 300) automation_state.loop_automation = settings.get("loop_automation", False) await automation_state.save_to_db() logger.info("Automation started with settings: %s", settings) return {"status": "started", "settings": settings} except Exception as e: logger.error(f"Failed to start automation: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/stop") async def stop_automation(): """Stop automation process""" try: automation_state.is_running = False await automation_state.save_to_db() logger.info("Automation stopped") return {"status": "stopped"} except Exception as e: logger.error(f"Failed to stop automation: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/pause") async def pause_automation(): """Pause automation process""" try: automation_state.is_running = False await automation_state.save_to_db() logger.info("Automation paused") return {"status": "paused"} except Exception as e: logger.error(f"Failed to pause automation: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/resume") async def resume_automation(): """Resume automation process""" try: automation_state.is_running = True await automation_state.save_to_db() logger.info("Automation resumed") return {"status": "resumed"} except Exception as e: logger.error(f"Failed to resume automation: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/status") async def get_automation_status(): """Get current automation status""" try: return { "is_running": automation_state.is_running, "current_group": automation_state.current_group, "current_position": automation_state.current_position, "total_emails_sent": automation_state.total_emails_sent, "group_emails_sent": automation_state.group_emails_sent, "distribution_method": automation_state.distribution_method, "loop_interval": automation_state.loop_interval, "max_emails_per_group": automation_state.max_emails_per_group, "loop_automation": automation_state.loop_automation } except Exception as e: logger.error(f"Failed to get automation status: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/health") async def health_check(): """Health check endpoint""" try: # Test Supabase connection response = await supabase.table("automation_jobs").select("id").limit(1).execute() return { "status": "healthy", "timestamp": datetime.now().isoformat(), "supabase_connection": "ok" } except Exception as e: ======= # WebSocket connection manager class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def broadcast(self, message: str): for connection in self.active_connections: try: await connection.send_text(message) except Exception as e: logger.error(f"Error broadcasting to WebSocket: {str(e)}") await self.disconnect(connection) manager = ConnectionManager() @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: while True: # Send real-time metrics and state updates metrics = process_metrics.get_metrics() state = { "automation_state": { "is_running": automation_state.is_running, "current_group": automation_state.current_group, "total_emails_sent": automation_state.total_emails_sent, "group_emails_sent": automation_state.group_emails_sent }, "metrics": metrics, "timestamp": datetime.now().isoformat() } await websocket.send_json(state) await asyncio.sleep(1) # Update every second except Exception as e: logger.error(f"WebSocket error: {str(e)}") finally: manager.disconnect(websocket) @app.get("/health") async def health_check(): """Enhanced health check endpoint""" try: # Test Supabase connection supabase_status = "ok" try: response = await supabase.table("automation_jobs").select("id").limit(1).execute() except Exception as e: supabase_status = f"error: {str(e)}" process_metrics.log_error(e) # Test OpenAI connection openai_status = "ok" try: await openai_client.models.list() except Exception as e: openai_status = f"error: {str(e)}" process_metrics.log_error(e) # Get process metrics metrics = process_metrics.get_metrics() return { "status": "healthy" if all(x == "ok" for x in [supabase_status, openai_status]) else "degraded", "version": WORKER_VERSION, "timestamp": datetime.now().isoformat(), "environment": os.environ.get("ENVIRONMENT", "production"), "process_metrics": metrics, "services": { "supabase": supabase_status, "openai": openai_status, "cloudflare_worker": cloudflare_worker_url, } } except Exception as e: process_metrics.log_error(e) >>>>>>> 10d35ff (RemoteAdded) logger.error(f"Health check failed: {str(e)}") return { "status": "unhealthy", "timestamp": datetime.now().isoformat(), "error": str(e) } <<<<<<< HEAD ======= @app.get("/metrics") async def metrics(): """Prometheus metrics endpoint""" return generate_latest() # Update the process status in metrics when the automation state changes async def update_process_metrics(): while True: try: process_metrics.status = "running" if automation_state.is_running else "idle" CURRENT_BATCH_SIZE.set(len(automation_state.current_batch)) await asyncio.sleep(5) except Exception as e: logger.error(f"Error updating metrics: {str(e)}") await asyncio.sleep(1) @app.on_event("startup") async def startup_event(): asyncio.create_task(update_process_metrics()) >>>>>>> 10d35ff (RemoteAdded) if __name__ == "__main__": import uvicorn port = int(os.environ.get("PORT", 7860)) uvicorn.run(app, host="0.0.0.0", port=port)