import os import json import torch from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, AutoModelForCausalLM, TrainingArguments, Trainer, AutoModelForTextToWaveform, GPT2LMHeadModel, GPT2Tokenizer, pipeline, AutoModelForSeq2SeqLM, AutoFeatureExtractor, AutoModelForAudioClassification, ) from diffusers import StableDiffusionPipeline, StableDiffusionImg2ImgPipeline from fastapi import FastAPI, HTTPException, Request from fastapi.responses import HTMLResponse, FileResponse import multiprocessing import uuid import numpy as np from PIL import Image import requests from dotenv import load_dotenv from huggingface_hub import HfApi, HfFolder import psutil import gc load_dotenv() app = FastAPI() default_language = "es" RAM_STORAGE = {} gpt2_variants = [ "gpt2", "gpt2-medium", "gpt2-large", "gpt2-xl" ] global_models = {} global_tokenizers = {} def download_model(model_class, pretrained_name): while True: try: model = model_class.from_pretrained( pretrained_name, local_files_only=False, resume_download=True ) return model except Exception as e: print(f"Error downloading {pretrained_name}: {e}") print("Retrying download...") try: model = model_class.from_pretrained( pretrained_name, local_files_only=False, resume_download=True, force_download=True, ) return model except Exception as e: print(f"Failed to download after retry: {e}") return None for variant in gpt2_variants: global_tokenizers[variant] = download_model(AutoTokenizer, variant) global_models[variant] = download_model(AutoModelForCausalLM, variant) global_models["musicgen-small"] = download_model( AutoModelForTextToWaveform, "facebook/musicgen-small" ) global_tokenizers["musicgen-small"] = download_model( AutoTokenizer, "facebook/musicgen-small" ) global_models["stable-diffusion-v1-4"] = download_model( StableDiffusionPipeline, "CompVis/stable-diffusion-v1-4" ) global_models["stable-diffusion-2-1"] = download_model( StableDiffusionPipeline, "stabilityai/stable-diffusion-2-1" ) global_tokenizers["codegen-350M-mono"] = download_model( AutoTokenizer, "Salesforce/codegen-350M-mono" ) global_models["codegen-350M-mono"] = download_model( AutoModelForCausalLM, "Salesforce/codegen-350M-mono" ) global_tokenizers["bart-large-cnn"] = download_model( AutoTokenizer, "facebook/bart-large-cnn" ) global_models["bart-large-cnn"] = download_model( AutoModelForSeq2SeqLM, "facebook/bart-large-cnn" ) global_tokenizers["m2m100_418M"] = download_model( AutoTokenizer, "facebook/m2m100_418M" ) global_models["m2m100_418M"] = download_model( AutoModelForSeq2SeqLM, "facebook/m2m100_418M" ) global_tokenizers["wav2vec2-base"] = download_model( AutoFeatureExtractor, "facebook/wav2vec2-base" ) global_models["wav2vec2-base"] = download_model( AutoModelForAudioClassification, "facebook/wav2vec2-base" ) global_tokenizers[ "distilbert-base-uncased-finetuned-sst-2-english" ] = download_model(AutoTokenizer, "distilbert-base-uncased-finetuned-sst-2-english") global_models["distilbert-base-uncased-finetuned-sst-2-english"] = download_model( AutoModelForSequenceClassification, "distilbert-base-uncased-finetuned-sst-2-english", ) global_models["stable-diffusion-2-1-base"] = download_model( StableDiffusionImg2ImgPipeline, "stabilityai/stable-diffusion-2-1-base" ) class ChatbotService: def __init__(self, model_name="response_model", tokenizer_name="response_tokenizer"): self.model_name = model_name self.tokenizer_name = tokenizer_name self.model = self.load_model() self.tokenizer = self.load_tokenizer() def get_response(self, user_id, message, language=default_language): if self.model is None or self.tokenizer is None: return "El modelo aún no está listo. Por favor, inténtelo de nuevo más tarde." input_text = f"Usuario: {message} Asistente:" input_ids = self.tokenizer.encode(input_text, return_tensors="pt").to("cpu") with torch.no_grad(): output = self.model.generate( input_ids=input_ids, max_length=100, num_beams=5, no_repeat_ngram_size=2, early_stopping=True, ) response = self.tokenizer.decode(output[0], skip_special_tokens=True) response = response.replace(input_text, "").strip() return response def load_model(self): if self.model_name in global_models: return global_models[self.model_name] return None def load_tokenizer(self): if self.tokenizer_name in global_tokenizers: return global_tokenizers[self.tokenizer_name] return None def save_model(self): global_models[self.model_name] = self.model def save_tokenizer(self): global_tokenizers[self.tokenizer_name] = self.tokenizer chatbot_services = {} for variant in gpt2_variants: chatbot_services[variant] = ChatbotService(model_name=variant, tokenizer_name=variant) programming_service = ChatbotService( model_name="codegen-350M-mono", tokenizer_name="codegen-350M-mono" ) summarization_service = ChatbotService( model_name="bart-large-cnn", tokenizer_name="bart-large-cnn" ) translation_service = ChatbotService( model_name="m2m100_418M", tokenizer_name="m2m100_418M" ) sentiment_analysis_service = ChatbotService( model_name="distilbert-base-uncased-finetuned-sst-2-english", tokenizer_name="distilbert-base-uncased-finetuned-sst-2-english", ) class UnifiedModel(AutoModelForSequenceClassification): def __init__(self, config): super().__init__(config) @staticmethod def load_model(): model_name = "unified_model" if model_name in RAM_STORAGE: return RAM_STORAGE[model_name] else: model = UnifiedModel.from_pretrained("gpt2", num_labels=3) RAM_STORAGE[model_name] = model return model class SyntheticDataset(torch.utils.data.Dataset): def __init__(self, tokenizer, data): self.tokenizer = tokenizer self.data = data def __len__(self): return len(self.data) def __getitem__(self, idx): item = self.data[idx] text = item["text"] label = item["label"] tokens = self.tokenizer( text, padding="max_length", truncation=True, max_length=128, return_tensors="pt" ) return { "input_ids": tokens["input_ids"].squeeze(), "attention_mask": tokens["attention_mask"].squeeze(), "labels": label, } conversation_history = {} tokenizer_name = "unified_tokenizer" tokenizer = AutoTokenizer.from_pretrained("gpt2") tokenizer.pad_token = tokenizer.eos_token unified_model = UnifiedModel.load_model() unified_model.to(torch.device("cpu")) musicgen_tokenizer = global_tokenizers["musicgen-small"] musicgen_model = global_models["musicgen-small"] stable_diffusion_pipeline = global_models["stable-diffusion-v1-4"] stable_diffusion_pipeline.enable_model_cpu_offload() img2img_pipeline = global_models["stable-diffusion-2-1-base"] img2img_pipeline.enable_model_cpu_offload() @app.on_event("startup") async def startup_event(): auto_learn_process = multiprocessing.Process(target=train_unified_model) auto_learn_process.start() music_training_process = multiprocessing.Process(target=auto_learn_music) music_training_process.start() image_training_process = multiprocessing.Process(target=auto_learn_images) image_training_process.start() programming_training_process = multiprocessing.Process(target=train_programming_model) programming_training_process.start() summarization_process = multiprocessing.Process(target=train_summarization_model) summarization_process.start() translation_process = multiprocessing.Process(target=train_translation_model) translation_process.start() sentiment_analysis_process = multiprocessing.Process( target=train_sentiment_analysis_model ) sentiment_analysis_process.start() image_editing_process = multiprocessing.Process(target=train_image_editing_model) image_editing_process.start() @app.post("/generate_image") async def generate_image(request: Request): data = await request.json() prompt = data.get("prompt") if not prompt: raise HTTPException(status_code=400, detail="Missing 'prompt' in request body.") image = stable_diffusion_pipeline(prompt).images[0] image_path = "generated_image.png" image.save(image_path) RAM_STORAGE[image_path] = image.tobytes() del image # Release memory gc.collect() return FileResponse(image_path, media_type="image/png") @app.post("/edit_image") async def edit_image(request: Request): data = await request.json() image_url = data.get("image_url") prompt = data.get("prompt") if not image_url or not prompt: raise HTTPException( status_code=400, detail="Missing 'image_url' or 'prompt' in request body." ) image = Image.open(requests.get(image_url, stream=True).raw) edited_image = img2img_pipeline(prompt=prompt, image=image).images[0] edited_image_path = "edited_image.png" edited_image.save(edited_image_path) RAM_STORAGE[edited_image_path] = edited_image.tobytes() del image, edited_image # Release memory gc.collect() return FileResponse(edited_image_path, media_type="image/png") @app.post("/process") async def process(request: Request): global tokenizer, unified_model, chatbot_services data = await request.json() if data.get("train"): user_data = data.get("user_data", []) if not user_data: user_data = [ {"text": "Hola", "label": 1}, {"text": "Necesito ayuda", "label": 2}, {"text": "No entiendo", "label": 0}, ] training_queue_path = "training_queue.json" if training_queue_path in RAM_STORAGE: existing_data = json.loads(RAM_STORAGE[training_queue_path]) else: existing_data = [] RAM_STORAGE[training_queue_path] = json.dumps( existing_data + [{"tokenizers": {tokenizer_name: tokenizer.get_vocab()}, "data": user_data}] ) return {"message": "Training data received. Model will be updated asynchronously."} elif data.get("message"): user_id = data.get("user_id") text = data["message"] language = data.get("language", default_language) model_variant = data.get("model_variant", "gpt2") if user_id not in conversation_history: conversation_history[user_id] = [] conversation_history[user_id].append(text) contextualized_text = " ".join(conversation_history[user_id][-3:]) if model_variant in chatbot_services: response = chatbot_services[model_variant].get_response( user_id, contextualized_text, language ) elif model_variant == "programming": response = programming_service.get_response( user_id, contextualized_text, language ) elif model_variant == "summarization": response = summarization_service.get_response( user_id, contextualized_text, language ) elif model_variant == "translation": response = translation_service.get_response( user_id, contextualized_text, language ) elif model_variant == "sentiment_analysis": response = sentiment_analysis_service.get_response( user_id, contextualized_text, language ) else: response = chatbot_services["gpt2"].get_response( user_id, contextualized_text, language ) training_queue_path = "training_queue.json" if training_queue_path in RAM_STORAGE: existing_data = json.loads(RAM_STORAGE[training_queue_path]) else: existing_data = [] RAM_STORAGE[training_queue_path] = json.dumps( existing_data + [ { "tokenizers": {tokenizer_name: tokenizer.get_vocab()}, "data": [{"text": contextualized_text, "label": 0}], } ] ) return {"answer": response} else: raise HTTPException(status_code=400, detail="Request must contain 'train' or 'message'.") @app.get("/") async def get_home(): user_id = str(uuid.uuid4()) html_code = f""" Chatbot

Chatbot

""" return HTMLResponse(content=html_code) def train_unified_model(): global tokenizer, unified_model model_name = "unified_model" training_args = TrainingArguments( output_dir="./results", per_device_train_batch_size=8, num_train_epochs=3 ) while True: check_resource_usage() training_queue_path = "training_queue.json" if training_queue_path in RAM_STORAGE: training_data_list = json.loads(RAM_STORAGE[training_queue_path]) if training_data_list: training_data = training_data_list.pop(0) RAM_STORAGE[training_queue_path] = json.dumps(training_data_list) tokenizer_data = training_data.get("tokenizers") if tokenizer_data: tokenizer_name = list(tokenizer_data.keys())[0] existing_tokens = tokenizer.get_vocab() new_tokens = tokenizer_data[tokenizer_name] for token, id in new_tokens.items(): if token not in existing_tokens: tokenizer.add_tokens([token]) data = training_data.get("data", []) if data: dataset = SyntheticDataset(tokenizer, data) trainer = Trainer( model=unified_model, args=training_args, train_dataset=dataset ) trainer.train() RAM_STORAGE[model_name] = unified_model.state_dict() upload_unified_model(unified_model, tokenizer) initial_data_path = "initial_data.json" if initial_data_path in RAM_STORAGE: initial_data = json.loads(RAM_STORAGE[initial_data_path]) dataset = SyntheticDataset(tokenizer, initial_data) trainer = Trainer( model=unified_model, args=training_args, train_dataset=dataset ) trainer.train() RAM_STORAGE[model_name] = unified_model.state_dict() upload_unified_model(unified_model, tokenizer) def auto_learn_music(): global musicgen_tokenizer, musicgen_model while True: check_resource_usage() music_training_queue_path = "music_training_queue.json" if music_training_queue_path in RAM_STORAGE: music_training_data_list = json.loads(RAM_STORAGE[music_training_queue_path]) if music_training_data_list: music_training_data = music_training_data_list.pop(0) RAM_STORAGE[music_training_queue_path] = json.dumps( music_training_data_list ) inputs = musicgen_tokenizer( music_training_data, return_tensors="pt", padding=True ).to("cpu") musicgen_model.to("cpu") musicgen_model.train() optimizer = torch.optim.Adam(musicgen_model.parameters(), lr=5e-5) loss_fn = torch.nn.CrossEntropyLoss() for epoch in range(1): outputs = musicgen_model(**inputs) loss = loss_fn(outputs.logits, inputs["labels"]) optimizer.zero_grad() loss.backward() optimizer.step() global_models["musicgen-small"] = musicgen_model upload_unified_model(unified_model, tokenizer) def auto_learn_images(): global stable_diffusion_pipeline while True: check_resource_usage() image_training_queue_path = "image_training_queue.json" if image_training_queue_path in RAM_STORAGE: image_training_data_list = json.loads(RAM_STORAGE[image_training_queue_path]) if image_training_data_list: image_training_data = image_training_data_list.pop(0) RAM_STORAGE[image_training_queue_path] = json.dumps( image_training_data_list ) for image_prompt in image_training_data: image = stable_diffusion_pipeline( image_prompt, guidance_scale=0.0, num_inference_steps=4, max_sequence_length=256, generator=torch.Generator("cpu").manual_seed(0), ).images[0] image_tensor = torch.tensor(np.array(image)).unsqueeze(0).to("cpu") stable_diffusion_pipeline.unet.to("cpu") stable_diffusion_pipeline.unet.train() optimizer = torch.optim.Adam( stable_diffusion_pipeline.unet.parameters(), lr=1e-5 ) loss_fn = torch.nn.MSELoss() target_tensor = torch.zeros_like(image_tensor) for epoch in range(1): outputs = stable_diffusion_pipeline.unet(image_tensor) loss = loss_fn(outputs, target_tensor) optimizer.zero_grad() loss.backward() optimizer.step() del image, image_tensor, target_tensor # Release memory gc.collect() global_models["stable-diffusion-v1-4"] = stable_diffusion_pipeline upload_unified_model(unified_model, tokenizer) def train_programming_model(): global programming_service model_name = "codegen-350M-mono" tokenizer_name = "codegen-350M-mono" model = global_models[model_name] tokenizer = global_tokenizers[tokenizer_name] while True: check_resource_usage() training_queue_path = "programming_training_queue.json" if training_queue_path in RAM_STORAGE: training_data_list = json.loads(RAM_STORAGE[training_queue_path]) if training_data_list: training_data = training_data_list.pop(0) RAM_STORAGE[training_queue_path] = json.dumps(training_data_list) new_code = training_data.get("code", "") if new_code: inputs = tokenizer(new_code, return_tensors="pt").to("cpu") model.to("cpu") model.train() optimizer = torch.optim.Adam(model.parameters(), lr=5e-5) loss_fn = torch.nn.CrossEntropyLoss() for epoch in range(1): outputs = model(**inputs, labels=inputs["input_ids"]) loss = outputs.loss optimizer.zero_grad() loss.backward() optimizer.step() global_models[model_name] = model programming_service.model = model programming_service.tokenizer = tokenizer upload_unified_model(unified_model, tokenizer) def train_summarization_model(): global summarization_service model_name = "bart-large-cnn" tokenizer_name = "bart-large-cnn" model = global_models[model_name] tokenizer = global_tokenizers[tokenizer_name] while True: check_resource_usage() training_queue_path = "summarization_training_queue.json" if training_queue_path in RAM_STORAGE: training_data_list = json.loads(RAM_STORAGE[training_queue_path]) if training_data_list: training_data = training_data_list.pop(0) RAM_STORAGE[training_queue_path] = json.dumps(training_data_list) new_text = training_data.get("text", "") new_summary = training_data.get("summary", "") if new_text and new_summary: inputs = tokenizer( new_text, return_tensors="pt", truncation=True, max_length=512 ).to("cpu") labels = tokenizer( new_summary, return_tensors="pt", truncation=True, max_length=128 ).to("cpu") model.to("cpu") model.train() optimizer = torch.optim.Adam(model.parameters(), lr=5e-5) loss_fn = torch.nn.CrossEntropyLoss() for epoch in range(1): outputs = model(**inputs, labels=labels["input_ids"]) loss = outputs.loss optimizer.zero_grad() loss.backward() optimizer.step() global_models[model_name] = model summarization_service.model = model summarization_service.tokenizer = tokenizer upload_unified_model(unified_model, tokenizer) def train_translation_model(): global translation_service model_name = "m2m100_418M" tokenizer_name = "m2m100_418M" model = global_models[model_name] tokenizer = global_tokenizers[tokenizer_name] while True: check_resource_usage() training_queue_path = "translation_training_queue.json" if training_queue_path in RAM_STORAGE: training_data_list = json.loads(RAM_STORAGE[training_queue_path]) if training_data_list: training_data = training_data_list.pop(0) RAM_STORAGE[training_queue_path] = json.dumps(training_data_list) new_text = training_data.get("text", "") target_language = training_data.get("target_language", "en") if new_text: inputs = tokenizer( new_text, return_tensors="pt", truncation=True, max_length=512 ).to("cpu") model.to("cpu") model.train() optimizer = torch.optim.Adam(model.parameters(), lr=5e-5) loss_fn = torch.nn.CrossEntropyLoss() with tokenizer.as_target_tokenizer(): labels = tokenizer( new_text, return_tensors="pt", truncation=True, max_length=512, ).to("cpu") for epoch in range(1): outputs = model(**inputs, labels=labels["input_ids"]) loss = outputs.loss optimizer.zero_grad() loss.backward() optimizer.step() global_models[model_name] = model translation_service.model = model translation_service.tokenizer = tokenizer upload_unified_model(unified_model, tokenizer) def train_sentiment_analysis_model(): global sentiment_analysis_service model_name = "distilbert-base-uncased-finetuned-sst-2-english" tokenizer_name = "distilbert-base-uncased-finetuned-sst-2-english" model = global_models[model_name] tokenizer = global_tokenizers[tokenizer_name] while True: check_resource_usage() training_queue_path = "sentiment_analysis_training_queue.json" if training_queue_path in RAM_STORAGE: training_data_list = json.loads(RAM_STORAGE[training_queue_path]) if training_data_list: training_data = training_data_list.pop(0) RAM_STORAGE[training_queue_path] = json.dumps(training_data_list) new_text = training_data.get("text", "") sentiment_label = training_data.get("sentiment", 1) if new_text: inputs = tokenizer( new_text, return_tensors="pt", truncation=True, max_length=128 ).to("cpu") labels = torch.tensor([sentiment_label]).to("cpu") model.to("cpu") model.train() optimizer = torch.optim.Adam(model.parameters(), lr=5e-5) loss_fn = torch.nn.CrossEntropyLoss() for epoch in range(1): outputs = model(**inputs, labels=labels) loss = outputs.loss optimizer.zero_grad() loss.backward() optimizer.step() global_models[model_name] = model sentiment_analysis_service.model = model sentiment_analysis_service.tokenizer = tokenizer upload_unified_model(unified_model, tokenizer) def train_image_editing_model(): global img2img_pipeline model_name = "stable-diffusion-2-1-base" model = global_models[model_name] while True: check_resource_usage() training_queue_path = "image_editing_training_queue.json" if training_queue_path in RAM_STORAGE: training_data_list = json.loads(RAM_STORAGE[training_queue_path]) if training_data_list: training_data = training_data_list.pop(0) RAM_STORAGE[training_queue_path] = json.dumps(training_data_list) image_url = training_data.get("image_url") prompt = training_data.get("prompt") if image_url and prompt: image = Image.open(requests.get(image_url, stream=True).raw) image = image.resize((512, 512)) model.to("cpu") model.train() optimizer = torch.optim.Adam(model.unet.parameters(), lr=5e-5) loss_fn = torch.nn.MSELoss() for epoch in range(1): outputs = model( prompt=prompt, image=image, strength=0.8, guidance_scale=7.5 ) loss = loss_fn(outputs.images[0], image) optimizer.zero_grad() loss.backward() optimizer.step() del image # Release memory gc.collect() global_models[model_name] = model img2img_pipeline = model img2img_pipeline.enable_model_cpu_offload() upload_unified_model(unified_model, tokenizer) def upload_unified_model(model, tokenizer): """Uploads the unified model and tokenizer to Hugging Face Hub.""" api = HfApi() repo_id = "Yhhxhfh/test" token = os.getenv("HF_API_TOKEN") if token: api.create_repo(repo_id=repo_id, token=token, exist_ok=True, private=True) model.push_to_hub(repo_id, use_auth_token=token) tokenizer.push_to_hub(repo_id, use_auth_token=token) else: print("Hugging Face API token not found. Skipping upload.") def check_resource_usage(): """Monitors and manages resource usage (RAM, CPU, GPU).""" # RAM usage ram_percent = psutil.virtual_memory().percent if ram_percent > 90: # If RAM usage exceeds 90% print("WARNING: High RAM usage detected. Clearing cache...") torch.cuda.empty_cache() # Clear GPU cache gc.collect() # Force garbage collection if ram_percent > 95: # If RAM usage still exceeds 95% print("CRITICAL: Extremely high RAM usage. Terminating processes...") os.kill(os.getpid(), 9) # Terminate the script # CPU usage cpu_percent = psutil.cpu_percent(interval=1) if cpu_percent > 90: print("WARNING: High CPU usage detected. Reducing workload...") # Implement strategies to reduce CPU load, such as: # - Decreasing batch size in training loops # - Skipping a training step # - Limiting the number of parallel processes # GPU usage (if applicable) if torch.cuda.is_available(): gpu_memory = torch.cuda.memory_allocated() / 1024**2 # In MB if gpu_memory > torch.cuda.get_device_properties(0).total_memory * 0.9: print("WARNING: High GPU memory usage detected. Clearing cache...") torch.cuda.empty_cache() if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)