python-worker / app.py
luigi12345's picture
m
d8275d3
import os
import json
import asyncio
<<<<<<< HEAD
from fastapi import FastAPI, HTTPException, BackgroundTasks
=======
import boto3
from fastapi import FastAPI, HTTPException, BackgroundTasks, WebSocket
>>>>>>> 10d35ff (RemoteAdded)
from pydantic import BaseModel
from typing import Dict, List, Optional
import aiohttp
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
import logging
<<<<<<< HEAD
from datetime import datetime
from supabase import create_client, Client
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
=======
import psutil
import platform
from datetime import datetime
from openai import AsyncOpenAI
from supabase import create_client, Client
from prometheus_client import Counter, Gauge, generate_latest
import websockets
# Version information
WORKER_VERSION = "1.0.1"
>>>>>>> 10d35ff (RemoteAdded)
# Configure logging with more details
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger("huggingface_automation")
<<<<<<< HEAD
# Initialize FastAPI
app = FastAPI(
title="AutoclientAI Search/Email Service",
description="Distributed automation system for search and email processes",
version="1.0.0"
=======
# Prometheus metrics
EMAILS_SENT = Counter('emails_sent_total', 'Total number of emails sent')
CURRENT_BATCH_SIZE = Gauge('current_batch_size', 'Current batch processing size')
CPU_USAGE = Gauge('cpu_usage_percent', 'Current CPU usage percentage')
MEMORY_USAGE = Gauge('memory_usage_percent', 'Current memory usage percentage')
PROCESS_TIME = Gauge('process_time_seconds', 'Time taken to process each batch')
# Initialize FastAPI with API prefix
app = FastAPI(
title="AutoclientAI Search/Email Service",
description="Distributed automation system for search and email processes",
version=WORKER_VERSION,
root_path="/api" # Add API prefix
>>>>>>> 10d35ff (RemoteAdded)
)
# Initialize Supabase client with error handling
try:
supabase_url = os.environ.get("SUPABASE_URL")
supabase_key = os.environ.get("SUPABASE_KEY")
<<<<<<< HEAD
=======
cloudflare_worker_url = os.environ.get("CLOUDFLARE_WORKER_URL", "https://autoclient-worker.trigox.workers.dev/api")
>>>>>>> 10d35ff (RemoteAdded)
if not supabase_url or not supabase_key:
raise ValueError("Missing required Supabase environment variables")
supabase = create_client(supabase_url, supabase_key)
logger.info("Supabase client initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Supabase client: {str(e)}")
raise
<<<<<<< HEAD
=======
# Initialize AWS SES client
try:
aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
aws_region = os.environ.get('AWS_REGION')
if not all([aws_access_key_id, aws_secret_access_key, aws_region]):
logger.warning("Missing AWS credentials, email functionality will be limited")
ses = None
else:
ses = boto3.client('ses',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=aws_region
)
logger.info("AWS SES client initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize AWS SES client: {str(e)}")
ses = None
# Initialize OpenAI client
openai_client = AsyncOpenAI(api_key=os.environ.get('OPENAI_API_KEY'))
class ProcessMetrics:
def __init__(self):
self.start_time = datetime.now()
self.process = psutil.Process()
self.errors = []
self.last_error = None
self.recovery_attempts = 0
self.status = "initializing"
def get_metrics(self):
try:
cpu_percent = self.process.cpu_percent()
memory_percent = self.process.memory_percent()
CPU_USAGE.set(cpu_percent)
MEMORY_USAGE.set(memory_percent)
return {
"cpu_percent": cpu_percent,
"memory_percent": memory_percent,
"uptime_seconds": (datetime.now() - self.start_time).total_seconds(),
"thread_count": self.process.num_threads(),
"status": self.status,
"last_error": str(self.last_error) if self.last_error else None,
"recovery_attempts": self.recovery_attempts
}
except Exception as e:
logger.error(f"Error getting metrics: {str(e)}")
return {}
def log_error(self, error):
self.last_error = error
self.errors.append({
"timestamp": datetime.now().isoformat(),
"error": str(error)
})
if len(self.errors) > 100: # Keep last 100 errors
self.errors.pop(0)
def attempt_recovery(self):
self.recovery_attempts += 1
self.status = "recovering"
# Implement recovery logic here
process_metrics = ProcessMetrics()
>>>>>>> 10d35ff (RemoteAdded)
class AutomationState:
def __init__(self):
self.is_running = False
self.current_group = None
self.current_position = 0
self.total_emails_sent = 0
self.group_emails_sent = 0
self.distribution_method = "equitable"
self.loop_interval = 40
self.max_emails_per_group = 300
self.loop_automation = False
<<<<<<< HEAD
async def save_to_db(self):
try:
await supabase.table("automation_jobs").upsert({
=======
self.current_batch = []
self.processed_domains = set()
self.state_version = 1
self.last_updated = datetime.now()
async def save_to_db(self):
try:
data = {
>>>>>>> 10d35ff (RemoteAdded)
"status": "running" if self.is_running else "stopped",
"current_group": self.current_group,
"current_position": self.current_position,
"total_emails_sent": self.total_emails_sent,
"group_emails_sent": self.group_emails_sent,
"distribution_method": self.distribution_method,
"loop_interval": self.loop_interval,
"max_emails_per_group": self.max_emails_per_group,
"loop_automation": self.loop_automation,
<<<<<<< HEAD
"updated_at": datetime.now().isoformat()
}).execute()
except Exception as e:
=======
"state_version": self.state_version,
"updated_at": datetime.now().isoformat()
}
result = await supabase.table("automation_jobs").upsert(data).execute()
self.last_updated = datetime.now()
logger.info(f"State saved to database: {json.dumps(data, default=str)}")
return result
except Exception as e:
process_metrics.log_error(e)
>>>>>>> 10d35ff (RemoteAdded)
logger.error(f"Failed to save automation state: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to save state: {str(e)}")
automation_state = AutomationState()
<<<<<<< HEAD
@app.post("/start")
async def start_automation(settings: Dict):
"""Start automation process with given settings"""
try:
automation_state.is_running = True
automation_state.distribution_method = settings.get("distribution_method", "equitable")
automation_state.loop_interval = settings.get("loop_interval", 40)
automation_state.max_emails_per_group = settings.get("max_emails_per_group", 300)
automation_state.loop_automation = settings.get("loop_automation", False)
await automation_state.save_to_db()
logger.info("Automation started with settings: %s", settings)
return {"status": "started", "settings": settings}
except Exception as e:
logger.error(f"Failed to start automation: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/stop")
async def stop_automation():
"""Stop automation process"""
try:
automation_state.is_running = False
await automation_state.save_to_db()
logger.info("Automation stopped")
return {"status": "stopped"}
except Exception as e:
logger.error(f"Failed to stop automation: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/pause")
async def pause_automation():
"""Pause automation process"""
try:
automation_state.is_running = False
await automation_state.save_to_db()
logger.info("Automation paused")
return {"status": "paused"}
except Exception as e:
logger.error(f"Failed to pause automation: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/resume")
async def resume_automation():
"""Resume automation process"""
try:
automation_state.is_running = True
await automation_state.save_to_db()
logger.info("Automation resumed")
return {"status": "resumed"}
except Exception as e:
logger.error(f"Failed to resume automation: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/status")
async def get_automation_status():
"""Get current automation status"""
try:
return {
"is_running": automation_state.is_running,
"current_group": automation_state.current_group,
"current_position": automation_state.current_position,
"total_emails_sent": automation_state.total_emails_sent,
"group_emails_sent": automation_state.group_emails_sent,
"distribution_method": automation_state.distribution_method,
"loop_interval": automation_state.loop_interval,
"max_emails_per_group": automation_state.max_emails_per_group,
"loop_automation": automation_state.loop_automation
}
except Exception as e:
logger.error(f"Failed to get automation status: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
"""Health check endpoint"""
try:
# Test Supabase connection
response = await supabase.table("automation_jobs").select("id").limit(1).execute()
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"supabase_connection": "ok"
}
except Exception as e:
=======
# WebSocket connection manager
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
try:
await connection.send_text(message)
except Exception as e:
logger.error(f"Error broadcasting to WebSocket: {str(e)}")
await self.disconnect(connection)
manager = ConnectionManager()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
# Send real-time metrics and state updates
metrics = process_metrics.get_metrics()
state = {
"automation_state": {
"is_running": automation_state.is_running,
"current_group": automation_state.current_group,
"total_emails_sent": automation_state.total_emails_sent,
"group_emails_sent": automation_state.group_emails_sent
},
"metrics": metrics,
"timestamp": datetime.now().isoformat()
}
await websocket.send_json(state)
await asyncio.sleep(1) # Update every second
except Exception as e:
logger.error(f"WebSocket error: {str(e)}")
finally:
manager.disconnect(websocket)
@app.get("/health")
async def health_check():
"""Enhanced health check endpoint"""
try:
# Test Supabase connection
supabase_status = "ok"
try:
response = await supabase.table("automation_jobs").select("id").limit(1).execute()
except Exception as e:
supabase_status = f"error: {str(e)}"
process_metrics.log_error(e)
# Test OpenAI connection
openai_status = "ok"
try:
await openai_client.models.list()
except Exception as e:
openai_status = f"error: {str(e)}"
process_metrics.log_error(e)
# Get process metrics
metrics = process_metrics.get_metrics()
return {
"status": "healthy" if all(x == "ok" for x in [supabase_status, openai_status]) else "degraded",
"version": WORKER_VERSION,
"timestamp": datetime.now().isoformat(),
"environment": os.environ.get("ENVIRONMENT", "production"),
"process_metrics": metrics,
"services": {
"supabase": supabase_status,
"openai": openai_status,
"cloudflare_worker": cloudflare_worker_url,
}
}
except Exception as e:
process_metrics.log_error(e)
>>>>>>> 10d35ff (RemoteAdded)
logger.error(f"Health check failed: {str(e)}")
return {
"status": "unhealthy",
"timestamp": datetime.now().isoformat(),
"error": str(e)
}
<<<<<<< HEAD
=======
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint"""
return generate_latest()
# Update the process status in metrics when the automation state changes
async def update_process_metrics():
while True:
try:
process_metrics.status = "running" if automation_state.is_running else "idle"
CURRENT_BATCH_SIZE.set(len(automation_state.current_batch))
await asyncio.sleep(5)
except Exception as e:
logger.error(f"Error updating metrics: {str(e)}")
await asyncio.sleep(1)
@app.on_event("startup")
async def startup_event():
asyncio.create_task(update_process_metrics())
>>>>>>> 10d35ff (RemoteAdded)
if __name__ == "__main__":
import uvicorn
port = int(os.environ.get("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port)