Spaces:
Runtime error
Runtime error
import os | |
import json | |
import asyncio | |
<<<<<<< HEAD | |
from fastapi import FastAPI, HTTPException, BackgroundTasks | |
======= | |
import boto3 | |
from fastapi import FastAPI, HTTPException, BackgroundTasks, WebSocket | |
>>>>>>> 10d35ff (RemoteAdded) | |
from pydantic import BaseModel | |
from typing import Dict, List, Optional | |
import aiohttp | |
from bs4 import BeautifulSoup | |
from fake_useragent import UserAgent | |
import logging | |
<<<<<<< HEAD | |
from datetime import datetime | |
from supabase import create_client, Client | |
import smtplib | |
from email.mime.text import MIMEText | |
from email.mime.multipart import MIMEMultipart | |
======= | |
import psutil | |
import platform | |
from datetime import datetime | |
from openai import AsyncOpenAI | |
from supabase import create_client, Client | |
from prometheus_client import Counter, Gauge, generate_latest | |
import websockets | |
# Version information | |
WORKER_VERSION = "1.0.1" | |
>>>>>>> 10d35ff (RemoteAdded) | |
# Configure logging with more details | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger("huggingface_automation") | |
<<<<<<< HEAD | |
# Initialize FastAPI | |
app = FastAPI( | |
title="AutoclientAI Search/Email Service", | |
description="Distributed automation system for search and email processes", | |
version="1.0.0" | |
======= | |
# Prometheus metrics | |
EMAILS_SENT = Counter('emails_sent_total', 'Total number of emails sent') | |
CURRENT_BATCH_SIZE = Gauge('current_batch_size', 'Current batch processing size') | |
CPU_USAGE = Gauge('cpu_usage_percent', 'Current CPU usage percentage') | |
MEMORY_USAGE = Gauge('memory_usage_percent', 'Current memory usage percentage') | |
PROCESS_TIME = Gauge('process_time_seconds', 'Time taken to process each batch') | |
# Initialize FastAPI with API prefix | |
app = FastAPI( | |
title="AutoclientAI Search/Email Service", | |
description="Distributed automation system for search and email processes", | |
version=WORKER_VERSION, | |
root_path="/api" # Add API prefix | |
>>>>>>> 10d35ff (RemoteAdded) | |
) | |
# Initialize Supabase client with error handling | |
try: | |
supabase_url = os.environ.get("SUPABASE_URL") | |
supabase_key = os.environ.get("SUPABASE_KEY") | |
<<<<<<< HEAD | |
======= | |
cloudflare_worker_url = os.environ.get("CLOUDFLARE_WORKER_URL", "https://autoclient-worker.trigox.workers.dev/api") | |
>>>>>>> 10d35ff (RemoteAdded) | |
if not supabase_url or not supabase_key: | |
raise ValueError("Missing required Supabase environment variables") | |
supabase = create_client(supabase_url, supabase_key) | |
logger.info("Supabase client initialized successfully") | |
except Exception as e: | |
logger.error(f"Failed to initialize Supabase client: {str(e)}") | |
raise | |
<<<<<<< HEAD | |
======= | |
# Initialize AWS SES client | |
try: | |
aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID') | |
aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY') | |
aws_region = os.environ.get('AWS_REGION') | |
if not all([aws_access_key_id, aws_secret_access_key, aws_region]): | |
logger.warning("Missing AWS credentials, email functionality will be limited") | |
ses = None | |
else: | |
ses = boto3.client('ses', | |
aws_access_key_id=aws_access_key_id, | |
aws_secret_access_key=aws_secret_access_key, | |
region_name=aws_region | |
) | |
logger.info("AWS SES client initialized successfully") | |
except Exception as e: | |
logger.error(f"Failed to initialize AWS SES client: {str(e)}") | |
ses = None | |
# Initialize OpenAI client | |
openai_client = AsyncOpenAI(api_key=os.environ.get('OPENAI_API_KEY')) | |
class ProcessMetrics: | |
def __init__(self): | |
self.start_time = datetime.now() | |
self.process = psutil.Process() | |
self.errors = [] | |
self.last_error = None | |
self.recovery_attempts = 0 | |
self.status = "initializing" | |
def get_metrics(self): | |
try: | |
cpu_percent = self.process.cpu_percent() | |
memory_percent = self.process.memory_percent() | |
CPU_USAGE.set(cpu_percent) | |
MEMORY_USAGE.set(memory_percent) | |
return { | |
"cpu_percent": cpu_percent, | |
"memory_percent": memory_percent, | |
"uptime_seconds": (datetime.now() - self.start_time).total_seconds(), | |
"thread_count": self.process.num_threads(), | |
"status": self.status, | |
"last_error": str(self.last_error) if self.last_error else None, | |
"recovery_attempts": self.recovery_attempts | |
} | |
except Exception as e: | |
logger.error(f"Error getting metrics: {str(e)}") | |
return {} | |
def log_error(self, error): | |
self.last_error = error | |
self.errors.append({ | |
"timestamp": datetime.now().isoformat(), | |
"error": str(error) | |
}) | |
if len(self.errors) > 100: # Keep last 100 errors | |
self.errors.pop(0) | |
def attempt_recovery(self): | |
self.recovery_attempts += 1 | |
self.status = "recovering" | |
# Implement recovery logic here | |
process_metrics = ProcessMetrics() | |
>>>>>>> 10d35ff (RemoteAdded) | |
class AutomationState: | |
def __init__(self): | |
self.is_running = False | |
self.current_group = None | |
self.current_position = 0 | |
self.total_emails_sent = 0 | |
self.group_emails_sent = 0 | |
self.distribution_method = "equitable" | |
self.loop_interval = 40 | |
self.max_emails_per_group = 300 | |
self.loop_automation = False | |
<<<<<<< HEAD | |
async def save_to_db(self): | |
try: | |
await supabase.table("automation_jobs").upsert({ | |
======= | |
self.current_batch = [] | |
self.processed_domains = set() | |
self.state_version = 1 | |
self.last_updated = datetime.now() | |
async def save_to_db(self): | |
try: | |
data = { | |
>>>>>>> 10d35ff (RemoteAdded) | |
"status": "running" if self.is_running else "stopped", | |
"current_group": self.current_group, | |
"current_position": self.current_position, | |
"total_emails_sent": self.total_emails_sent, | |
"group_emails_sent": self.group_emails_sent, | |
"distribution_method": self.distribution_method, | |
"loop_interval": self.loop_interval, | |
"max_emails_per_group": self.max_emails_per_group, | |
"loop_automation": self.loop_automation, | |
<<<<<<< HEAD | |
"updated_at": datetime.now().isoformat() | |
}).execute() | |
except Exception as e: | |
======= | |
"state_version": self.state_version, | |
"updated_at": datetime.now().isoformat() | |
} | |
result = await supabase.table("automation_jobs").upsert(data).execute() | |
self.last_updated = datetime.now() | |
logger.info(f"State saved to database: {json.dumps(data, default=str)}") | |
return result | |
except Exception as e: | |
process_metrics.log_error(e) | |
>>>>>>> 10d35ff (RemoteAdded) | |
logger.error(f"Failed to save automation state: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Failed to save state: {str(e)}") | |
automation_state = AutomationState() | |
<<<<<<< HEAD | |
async def start_automation(settings: Dict): | |
"""Start automation process with given settings""" | |
try: | |
automation_state.is_running = True | |
automation_state.distribution_method = settings.get("distribution_method", "equitable") | |
automation_state.loop_interval = settings.get("loop_interval", 40) | |
automation_state.max_emails_per_group = settings.get("max_emails_per_group", 300) | |
automation_state.loop_automation = settings.get("loop_automation", False) | |
await automation_state.save_to_db() | |
logger.info("Automation started with settings: %s", settings) | |
return {"status": "started", "settings": settings} | |
except Exception as e: | |
logger.error(f"Failed to start automation: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def stop_automation(): | |
"""Stop automation process""" | |
try: | |
automation_state.is_running = False | |
await automation_state.save_to_db() | |
logger.info("Automation stopped") | |
return {"status": "stopped"} | |
except Exception as e: | |
logger.error(f"Failed to stop automation: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def pause_automation(): | |
"""Pause automation process""" | |
try: | |
automation_state.is_running = False | |
await automation_state.save_to_db() | |
logger.info("Automation paused") | |
return {"status": "paused"} | |
except Exception as e: | |
logger.error(f"Failed to pause automation: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def resume_automation(): | |
"""Resume automation process""" | |
try: | |
automation_state.is_running = True | |
await automation_state.save_to_db() | |
logger.info("Automation resumed") | |
return {"status": "resumed"} | |
except Exception as e: | |
logger.error(f"Failed to resume automation: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def get_automation_status(): | |
"""Get current automation status""" | |
try: | |
return { | |
"is_running": automation_state.is_running, | |
"current_group": automation_state.current_group, | |
"current_position": automation_state.current_position, | |
"total_emails_sent": automation_state.total_emails_sent, | |
"group_emails_sent": automation_state.group_emails_sent, | |
"distribution_method": automation_state.distribution_method, | |
"loop_interval": automation_state.loop_interval, | |
"max_emails_per_group": automation_state.max_emails_per_group, | |
"loop_automation": automation_state.loop_automation | |
} | |
except Exception as e: | |
logger.error(f"Failed to get automation status: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def health_check(): | |
"""Health check endpoint""" | |
try: | |
# Test Supabase connection | |
response = await supabase.table("automation_jobs").select("id").limit(1).execute() | |
return { | |
"status": "healthy", | |
"timestamp": datetime.now().isoformat(), | |
"supabase_connection": "ok" | |
} | |
except Exception as e: | |
======= | |
# WebSocket connection manager | |
class ConnectionManager: | |
def __init__(self): | |
self.active_connections: List[WebSocket] = [] | |
async def connect(self, websocket: WebSocket): | |
await websocket.accept() | |
self.active_connections.append(websocket) | |
def disconnect(self, websocket: WebSocket): | |
self.active_connections.remove(websocket) | |
async def broadcast(self, message: str): | |
for connection in self.active_connections: | |
try: | |
await connection.send_text(message) | |
except Exception as e: | |
logger.error(f"Error broadcasting to WebSocket: {str(e)}") | |
await self.disconnect(connection) | |
manager = ConnectionManager() | |
async def websocket_endpoint(websocket: WebSocket): | |
await manager.connect(websocket) | |
try: | |
while True: | |
# Send real-time metrics and state updates | |
metrics = process_metrics.get_metrics() | |
state = { | |
"automation_state": { | |
"is_running": automation_state.is_running, | |
"current_group": automation_state.current_group, | |
"total_emails_sent": automation_state.total_emails_sent, | |
"group_emails_sent": automation_state.group_emails_sent | |
}, | |
"metrics": metrics, | |
"timestamp": datetime.now().isoformat() | |
} | |
await websocket.send_json(state) | |
await asyncio.sleep(1) # Update every second | |
except Exception as e: | |
logger.error(f"WebSocket error: {str(e)}") | |
finally: | |
manager.disconnect(websocket) | |
async def health_check(): | |
"""Enhanced health check endpoint""" | |
try: | |
# Test Supabase connection | |
supabase_status = "ok" | |
try: | |
response = await supabase.table("automation_jobs").select("id").limit(1).execute() | |
except Exception as e: | |
supabase_status = f"error: {str(e)}" | |
process_metrics.log_error(e) | |
# Test OpenAI connection | |
openai_status = "ok" | |
try: | |
await openai_client.models.list() | |
except Exception as e: | |
openai_status = f"error: {str(e)}" | |
process_metrics.log_error(e) | |
# Get process metrics | |
metrics = process_metrics.get_metrics() | |
return { | |
"status": "healthy" if all(x == "ok" for x in [supabase_status, openai_status]) else "degraded", | |
"version": WORKER_VERSION, | |
"timestamp": datetime.now().isoformat(), | |
"environment": os.environ.get("ENVIRONMENT", "production"), | |
"process_metrics": metrics, | |
"services": { | |
"supabase": supabase_status, | |
"openai": openai_status, | |
"cloudflare_worker": cloudflare_worker_url, | |
} | |
} | |
except Exception as e: | |
process_metrics.log_error(e) | |
>>>>>>> 10d35ff (RemoteAdded) | |
logger.error(f"Health check failed: {str(e)}") | |
return { | |
"status": "unhealthy", | |
"timestamp": datetime.now().isoformat(), | |
"error": str(e) | |
} | |
<<<<<<< HEAD | |
======= | |
async def metrics(): | |
"""Prometheus metrics endpoint""" | |
return generate_latest() | |
# Update the process status in metrics when the automation state changes | |
async def update_process_metrics(): | |
while True: | |
try: | |
process_metrics.status = "running" if automation_state.is_running else "idle" | |
CURRENT_BATCH_SIZE.set(len(automation_state.current_batch)) | |
await asyncio.sleep(5) | |
except Exception as e: | |
logger.error(f"Error updating metrics: {str(e)}") | |
await asyncio.sleep(1) | |
async def startup_event(): | |
asyncio.create_task(update_process_metrics()) | |
>>>>>>> 10d35ff (RemoteAdded) | |
if __name__ == "__main__": | |
import uvicorn | |
port = int(os.environ.get("PORT", 7860)) | |
uvicorn.run(app, host="0.0.0.0", port=port) |