from fastapi import FastAPI, HTTPException from pydantic import BaseModel from llama_cpp import Llama from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm import uvicorn from dotenv import load_dotenv from difflib import SequenceMatcher import re # Cargar variables de entorno load_dotenv() # Inicializar aplicación FastAPI app = FastAPI() # Diccionario global para almacenar los modelos global_data = { 'models': [] } # Configuración de los modelos model_configs = [ {"repo_id": "Ffftdtd5dtft/Meta-Llama-3.1-8B-Instruct-Q2_K-GGUF", "filename": "meta-llama-3.1-8b-instruct-q2_k.gguf", "name": "Meta Llama 3.1-8B Instruct"}, ] # Clase para gestionar modelos class ModelManager: def __init__(self): self.models = [] def load_model(self, model_config): print(f"Cargando modelo: {model_config['name']}...") return {"model": Llama.from_pretrained(repo_id=model_config['repo_id'], filename=model_config['filename']), "name": model_config['name']} def load_all_models(self): print("Iniciando carga de modelos...") with ThreadPoolExecutor(max_workers=len(model_configs)) as executor: futures = [executor.submit(self.load_model, config) for config in model_configs] models = [] for future in tqdm(as_completed(futures), total=len(model_configs), desc="Cargando modelos", unit="modelo"): try: model = future.result() models.append(model) print(f"Modelo cargado exitosamente: {model['name']}") except Exception as e: print(f"Error al cargar el modelo: {e}") print("Todos los modelos han sido cargados.") return models # Instanciar ModelManager y cargar modelos model_manager = ModelManager() global_data['models'] = model_manager.load_all_models() # Modelo global para la solicitud de chat class ChatRequest(BaseModel): message: str top_k: int = 50 top_p: float = 0.95 temperature: float = 0.7 # Función para generar respuestas de chat def generate_chat_response(request, model_data): try: user_input = normalize_input(request.message) llm = model_data['model'] response = llm.create_chat_completion( messages=[{"role": "user", "content": user_input}], top_k=request.top_k, top_p=request.top_p, temperature=request.temperature ) reply = response['choices'][0]['message']['content'] return {"response": reply, "literal": user_input, "model_name": model_data['name']} except Exception as e: return {"response": f"Error: {str(e)}", "literal": user_input, "model_name": model_data['name']} def normalize_input(input_text): return input_text.strip() def remove_duplicates(text): text = re.sub(r'(Hello there, how are you\? \[/INST\]){2,}', 'Hello there, how are you? [/INST]', text) text = re.sub(r'(How are you\? \[/INST\]){2,}', 'How are you? [/INST]', text) text = text.replace('[/INST]', '') lines = text.split('\n') unique_lines = list(dict.fromkeys(lines)) return '\n'.join(unique_lines).strip() def remove_repetitive_responses(responses): seen = set() unique_responses = [] for response in responses: normalized_response = remove_duplicates(response['response']) if normalized_response not in seen: seen.add(normalized_response) unique_responses.append(response) return unique_responses def select_best_response(responses): print("Filtrando respuestas...") responses = remove_repetitive_responses(responses) responses = [remove_duplicates(response['response']) for response in responses] unique_responses = list(set(responses)) coherent_responses = filter_by_coherence(unique_responses) best_response = filter_by_similarity(coherent_responses) return best_response def filter_by_coherence(responses): print("Ordenando respuestas por coherencia...") responses.sort(key=len, reverse=True) return responses def filter_by_similarity(responses): print("Filtrando respuestas por similitud...") responses.sort(key=len, reverse=True) best_response = responses[0] for i in range(1, len(responses)): ratio = SequenceMatcher(None, best_response, responses[i]).ratio() if ratio < 0.9: best_response = responses[i] break return best_response def worker_function(model_data, request): print(f"Generando respuesta con el modelo: {model_data['name']}...") response = generate_chat_response(request, model_data) return response @app.post("/generate_chat") async def generate_chat(request: ChatRequest): if not request.message.strip(): raise HTTPException(status_code=400, detail="The message cannot be empty.") print(f"Procesando solicitud: {request.message}") responses = [] num_models = len(global_data['models']) with ThreadPoolExecutor(max_workers=num_models) as executor: futures = [executor.submit(worker_function, model_data, request) for model_data in global_data['models']] for future in tqdm(as_completed(futures), total=num_models, desc="Generando respuestas", unit="modelo"): try: response = future.result() responses.append(response) except Exception as exc: print(f"Error en la generación de respuesta: {exc}") best_response = select_best_response(responses) print(f"Mejor respuesta seleccionada: {best_response}") return { "best_response": best_response, "all_responses": responses } if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)