import os
import json
import torch
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
AutoModelForCausalLM,
TrainingArguments,
Trainer,
AutoModelForTextToWaveform,
GPT2LMHeadModel,
GPT2Tokenizer,
pipeline,
AutoModelForSeq2SeqLM,
AutoFeatureExtractor,
AutoModelForAudioClassification,
)
from diffusers import StableDiffusionPipeline, StableDiffusionImg2ImgPipeline
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse, FileResponse
import multiprocessing
import uuid
import numpy as np
from PIL import Image
import requests
from dotenv import load_dotenv
from huggingface_hub import HfApi, HfFolder
import psutil
import gc
load_dotenv()
app = FastAPI()
default_language = "es"
RAM_STORAGE = {}
gpt2_variants = [
"gpt2",
"gpt2-medium",
"gpt2-large",
"gpt2-xl"
]
global_models = {}
global_tokenizers = {}
def download_model(model_class, pretrained_name):
while True:
try:
model = model_class.from_pretrained(
pretrained_name, local_files_only=False, resume_download=True
)
return model
except Exception as e:
print(f"Error downloading {pretrained_name}: {e}")
print("Retrying download...")
try:
model = model_class.from_pretrained(
pretrained_name,
local_files_only=False,
resume_download=True,
force_download=True,
)
return model
except Exception as e:
print(f"Failed to download after retry: {e}")
return None
for variant in gpt2_variants:
global_tokenizers[variant] = download_model(AutoTokenizer, variant)
global_models[variant] = download_model(AutoModelForCausalLM, variant)
global_models["musicgen-small"] = download_model(
AutoModelForTextToWaveform, "facebook/musicgen-small"
)
global_tokenizers["musicgen-small"] = download_model(
AutoTokenizer, "facebook/musicgen-small"
)
global_models["stable-diffusion-v1-4"] = download_model(
StableDiffusionPipeline, "CompVis/stable-diffusion-v1-4"
)
global_models["stable-diffusion-2-1"] = download_model(
StableDiffusionPipeline, "stabilityai/stable-diffusion-2-1"
)
global_tokenizers["codegen-350M-mono"] = download_model(
AutoTokenizer, "Salesforce/codegen-350M-mono"
)
global_models["codegen-350M-mono"] = download_model(
AutoModelForCausalLM, "Salesforce/codegen-350M-mono"
)
global_tokenizers["bart-large-cnn"] = download_model(
AutoTokenizer, "facebook/bart-large-cnn"
)
global_models["bart-large-cnn"] = download_model(
AutoModelForSeq2SeqLM, "facebook/bart-large-cnn"
)
global_tokenizers["m2m100_418M"] = download_model(
AutoTokenizer, "facebook/m2m100_418M"
)
global_models["m2m100_418M"] = download_model(
AutoModelForSeq2SeqLM, "facebook/m2m100_418M"
)
global_tokenizers["wav2vec2-base"] = download_model(
AutoFeatureExtractor, "facebook/wav2vec2-base"
)
global_models["wav2vec2-base"] = download_model(
AutoModelForAudioClassification, "facebook/wav2vec2-base"
)
global_tokenizers[
"distilbert-base-uncased-finetuned-sst-2-english"
] = download_model(AutoTokenizer, "distilbert-base-uncased-finetuned-sst-2-english")
global_models["distilbert-base-uncased-finetuned-sst-2-english"] = download_model(
AutoModelForSequenceClassification,
"distilbert-base-uncased-finetuned-sst-2-english",
)
global_models["stable-diffusion-2-1-base"] = download_model(
StableDiffusionImg2ImgPipeline, "stabilityai/stable-diffusion-2-1-base"
)
class ChatbotService:
def __init__(self, model_name="response_model", tokenizer_name="response_tokenizer"):
self.model_name = model_name
self.tokenizer_name = tokenizer_name
self.model = self.load_model()
self.tokenizer = self.load_tokenizer()
def get_response(self, user_id, message, language=default_language):
if self.model is None or self.tokenizer is None:
return "El modelo aún no está listo. Por favor, inténtelo de nuevo más tarde."
input_text = f"Usuario: {message} Asistente:"
input_ids = self.tokenizer.encode(input_text, return_tensors="pt").to("cpu")
with torch.no_grad():
output = self.model.generate(
input_ids=input_ids,
max_length=100,
num_beams=5,
no_repeat_ngram_size=2,
early_stopping=True,
)
response = self.tokenizer.decode(output[0], skip_special_tokens=True)
response = response.replace(input_text, "").strip()
return response
def load_model(self):
if self.model_name in global_models:
return global_models[self.model_name]
return None
def load_tokenizer(self):
if self.tokenizer_name in global_tokenizers:
return global_tokenizers[self.tokenizer_name]
return None
def save_model(self):
global_models[self.model_name] = self.model
def save_tokenizer(self):
global_tokenizers[self.tokenizer_name] = self.tokenizer
chatbot_services = {}
for variant in gpt2_variants:
chatbot_services[variant] = ChatbotService(model_name=variant, tokenizer_name=variant)
programming_service = ChatbotService(
model_name="codegen-350M-mono", tokenizer_name="codegen-350M-mono"
)
summarization_service = ChatbotService(
model_name="bart-large-cnn", tokenizer_name="bart-large-cnn"
)
translation_service = ChatbotService(
model_name="m2m100_418M", tokenizer_name="m2m100_418M"
)
sentiment_analysis_service = ChatbotService(
model_name="distilbert-base-uncased-finetuned-sst-2-english",
tokenizer_name="distilbert-base-uncased-finetuned-sst-2-english",
)
class UnifiedModel(AutoModelForSequenceClassification):
def __init__(self, config):
super().__init__(config)
@staticmethod
def load_model():
model_name = "unified_model"
if model_name in RAM_STORAGE:
return RAM_STORAGE[model_name]
else:
model = UnifiedModel.from_pretrained("gpt2", num_labels=3)
RAM_STORAGE[model_name] = model
return model
class SyntheticDataset(torch.utils.data.Dataset):
def __init__(self, tokenizer, data):
self.tokenizer = tokenizer
self.data = data
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
item = self.data[idx]
text = item["text"]
label = item["label"]
tokens = self.tokenizer(
text, padding="max_length", truncation=True, max_length=128, return_tensors="pt"
)
return {
"input_ids": tokens["input_ids"].squeeze(),
"attention_mask": tokens["attention_mask"].squeeze(),
"labels": label,
}
conversation_history = {}
tokenizer_name = "unified_tokenizer"
tokenizer = AutoTokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token
unified_model = UnifiedModel.load_model()
unified_model.to(torch.device("cpu"))
musicgen_tokenizer = global_tokenizers["musicgen-small"]
musicgen_model = global_models["musicgen-small"]
stable_diffusion_pipeline = global_models["stable-diffusion-v1-4"]
stable_diffusion_pipeline.enable_model_cpu_offload()
img2img_pipeline = global_models["stable-diffusion-2-1-base"]
img2img_pipeline.enable_model_cpu_offload()
@app.on_event("startup")
async def startup_event():
auto_learn_process = multiprocessing.Process(target=train_unified_model)
auto_learn_process.start()
music_training_process = multiprocessing.Process(target=auto_learn_music)
music_training_process.start()
image_training_process = multiprocessing.Process(target=auto_learn_images)
image_training_process.start()
programming_training_process = multiprocessing.Process(target=train_programming_model)
programming_training_process.start()
summarization_process = multiprocessing.Process(target=train_summarization_model)
summarization_process.start()
translation_process = multiprocessing.Process(target=train_translation_model)
translation_process.start()
sentiment_analysis_process = multiprocessing.Process(
target=train_sentiment_analysis_model
)
sentiment_analysis_process.start()
image_editing_process = multiprocessing.Process(target=train_image_editing_model)
image_editing_process.start()
@app.post("/generate_image")
async def generate_image(request: Request):
data = await request.json()
prompt = data.get("prompt")
if not prompt:
raise HTTPException(status_code=400, detail="Missing 'prompt' in request body.")
image = stable_diffusion_pipeline(prompt).images[0]
image_path = "generated_image.png"
image.save(image_path)
RAM_STORAGE[image_path] = image.tobytes()
del image # Release memory
gc.collect()
return FileResponse(image_path, media_type="image/png")
@app.post("/edit_image")
async def edit_image(request: Request):
data = await request.json()
image_url = data.get("image_url")
prompt = data.get("prompt")
if not image_url or not prompt:
raise HTTPException(
status_code=400, detail="Missing 'image_url' or 'prompt' in request body."
)
image = Image.open(requests.get(image_url, stream=True).raw)
edited_image = img2img_pipeline(prompt=prompt, image=image).images[0]
edited_image_path = "edited_image.png"
edited_image.save(edited_image_path)
RAM_STORAGE[edited_image_path] = edited_image.tobytes()
del image, edited_image # Release memory
gc.collect()
return FileResponse(edited_image_path, media_type="image/png")
@app.post("/process")
async def process(request: Request):
global tokenizer, unified_model, chatbot_services
data = await request.json()
if data.get("train"):
user_data = data.get("user_data", [])
if not user_data:
user_data = [
{"text": "Hola", "label": 1},
{"text": "Necesito ayuda", "label": 2},
{"text": "No entiendo", "label": 0},
]
training_queue_path = "training_queue.json"
if training_queue_path in RAM_STORAGE:
existing_data = json.loads(RAM_STORAGE[training_queue_path])
else:
existing_data = []
RAM_STORAGE[training_queue_path] = json.dumps(
existing_data
+ [{"tokenizers": {tokenizer_name: tokenizer.get_vocab()}, "data": user_data}]
)
return {"message": "Training data received. Model will be updated asynchronously."}
elif data.get("message"):
user_id = data.get("user_id")
text = data["message"]
language = data.get("language", default_language)
model_variant = data.get("model_variant", "gpt2")
if user_id not in conversation_history:
conversation_history[user_id] = []
conversation_history[user_id].append(text)
contextualized_text = " ".join(conversation_history[user_id][-3:])
if model_variant in chatbot_services:
response = chatbot_services[model_variant].get_response(
user_id, contextualized_text, language
)
elif model_variant == "programming":
response = programming_service.get_response(
user_id, contextualized_text, language
)
elif model_variant == "summarization":
response = summarization_service.get_response(
user_id, contextualized_text, language
)
elif model_variant == "translation":
response = translation_service.get_response(
user_id, contextualized_text, language
)
elif model_variant == "sentiment_analysis":
response = sentiment_analysis_service.get_response(
user_id, contextualized_text, language
)
else:
response = chatbot_services["gpt2"].get_response(
user_id, contextualized_text, language
)
training_queue_path = "training_queue.json"
if training_queue_path in RAM_STORAGE:
existing_data = json.loads(RAM_STORAGE[training_queue_path])
else:
existing_data = []
RAM_STORAGE[training_queue_path] = json.dumps(
existing_data
+ [
{
"tokenizers": {tokenizer_name: tokenizer.get_vocab()},
"data": [{"text": contextualized_text, "label": 0}],
}
]
)
return {"answer": response}
else:
raise HTTPException(status_code=400, detail="Request must contain 'train' or 'message'.")
@app.get("/")
async def get_home():
user_id = str(uuid.uuid4())
html_code = f"""
Chatbot
"""
return HTMLResponse(content=html_code)
def train_unified_model():
global tokenizer, unified_model
model_name = "unified_model"
training_args = TrainingArguments(
output_dir="./results", per_device_train_batch_size=8, num_train_epochs=3
)
while True:
check_resource_usage()
training_queue_path = "training_queue.json"
if training_queue_path in RAM_STORAGE:
training_data_list = json.loads(RAM_STORAGE[training_queue_path])
if training_data_list:
training_data = training_data_list.pop(0)
RAM_STORAGE[training_queue_path] = json.dumps(training_data_list)
tokenizer_data = training_data.get("tokenizers")
if tokenizer_data:
tokenizer_name = list(tokenizer_data.keys())[0]
existing_tokens = tokenizer.get_vocab()
new_tokens = tokenizer_data[tokenizer_name]
for token, id in new_tokens.items():
if token not in existing_tokens:
tokenizer.add_tokens([token])
data = training_data.get("data", [])
if data:
dataset = SyntheticDataset(tokenizer, data)
trainer = Trainer(
model=unified_model, args=training_args, train_dataset=dataset
)
trainer.train()
RAM_STORAGE[model_name] = unified_model.state_dict()
upload_unified_model(unified_model, tokenizer)
initial_data_path = "initial_data.json"
if initial_data_path in RAM_STORAGE:
initial_data = json.loads(RAM_STORAGE[initial_data_path])
dataset = SyntheticDataset(tokenizer, initial_data)
trainer = Trainer(
model=unified_model, args=training_args, train_dataset=dataset
)
trainer.train()
RAM_STORAGE[model_name] = unified_model.state_dict()
upload_unified_model(unified_model, tokenizer)
def auto_learn_music():
global musicgen_tokenizer, musicgen_model
while True:
check_resource_usage()
music_training_queue_path = "music_training_queue.json"
if music_training_queue_path in RAM_STORAGE:
music_training_data_list = json.loads(RAM_STORAGE[music_training_queue_path])
if music_training_data_list:
music_training_data = music_training_data_list.pop(0)
RAM_STORAGE[music_training_queue_path] = json.dumps(
music_training_data_list
)
inputs = musicgen_tokenizer(
music_training_data, return_tensors="pt", padding=True
).to("cpu")
musicgen_model.to("cpu")
musicgen_model.train()
optimizer = torch.optim.Adam(musicgen_model.parameters(), lr=5e-5)
loss_fn = torch.nn.CrossEntropyLoss()
for epoch in range(1):
outputs = musicgen_model(**inputs)
loss = loss_fn(outputs.logits, inputs["labels"])
optimizer.zero_grad()
loss.backward()
optimizer.step()
global_models["musicgen-small"] = musicgen_model
upload_unified_model(unified_model, tokenizer)
def auto_learn_images():
global stable_diffusion_pipeline
while True:
check_resource_usage()
image_training_queue_path = "image_training_queue.json"
if image_training_queue_path in RAM_STORAGE:
image_training_data_list = json.loads(RAM_STORAGE[image_training_queue_path])
if image_training_data_list:
image_training_data = image_training_data_list.pop(0)
RAM_STORAGE[image_training_queue_path] = json.dumps(
image_training_data_list
)
for image_prompt in image_training_data:
image = stable_diffusion_pipeline(
image_prompt,
guidance_scale=0.0,
num_inference_steps=4,
max_sequence_length=256,
generator=torch.Generator("cpu").manual_seed(0),
).images[0]
image_tensor = torch.tensor(np.array(image)).unsqueeze(0).to("cpu")
stable_diffusion_pipeline.unet.to("cpu")
stable_diffusion_pipeline.unet.train()
optimizer = torch.optim.Adam(
stable_diffusion_pipeline.unet.parameters(), lr=1e-5
)
loss_fn = torch.nn.MSELoss()
target_tensor = torch.zeros_like(image_tensor)
for epoch in range(1):
outputs = stable_diffusion_pipeline.unet(image_tensor)
loss = loss_fn(outputs, target_tensor)
optimizer.zero_grad()
loss.backward()
optimizer.step()
del image, image_tensor, target_tensor # Release memory
gc.collect()
global_models["stable-diffusion-v1-4"] = stable_diffusion_pipeline
upload_unified_model(unified_model, tokenizer)
def train_programming_model():
global programming_service
model_name = "codegen-350M-mono"
tokenizer_name = "codegen-350M-mono"
model = global_models[model_name]
tokenizer = global_tokenizers[tokenizer_name]
while True:
check_resource_usage()
training_queue_path = "programming_training_queue.json"
if training_queue_path in RAM_STORAGE:
training_data_list = json.loads(RAM_STORAGE[training_queue_path])
if training_data_list:
training_data = training_data_list.pop(0)
RAM_STORAGE[training_queue_path] = json.dumps(training_data_list)
new_code = training_data.get("code", "")
if new_code:
inputs = tokenizer(new_code, return_tensors="pt").to("cpu")
model.to("cpu")
model.train()
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)
loss_fn = torch.nn.CrossEntropyLoss()
for epoch in range(1):
outputs = model(**inputs, labels=inputs["input_ids"])
loss = outputs.loss
optimizer.zero_grad()
loss.backward()
optimizer.step()
global_models[model_name] = model
programming_service.model = model
programming_service.tokenizer = tokenizer
upload_unified_model(unified_model, tokenizer)
def train_summarization_model():
global summarization_service
model_name = "bart-large-cnn"
tokenizer_name = "bart-large-cnn"
model = global_models[model_name]
tokenizer = global_tokenizers[tokenizer_name]
while True:
check_resource_usage()
training_queue_path = "summarization_training_queue.json"
if training_queue_path in RAM_STORAGE:
training_data_list = json.loads(RAM_STORAGE[training_queue_path])
if training_data_list:
training_data = training_data_list.pop(0)
RAM_STORAGE[training_queue_path] = json.dumps(training_data_list)
new_text = training_data.get("text", "")
new_summary = training_data.get("summary", "")
if new_text and new_summary:
inputs = tokenizer(
new_text, return_tensors="pt", truncation=True, max_length=512
).to("cpu")
labels = tokenizer(
new_summary, return_tensors="pt", truncation=True, max_length=128
).to("cpu")
model.to("cpu")
model.train()
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)
loss_fn = torch.nn.CrossEntropyLoss()
for epoch in range(1):
outputs = model(**inputs, labels=labels["input_ids"])
loss = outputs.loss
optimizer.zero_grad()
loss.backward()
optimizer.step()
global_models[model_name] = model
summarization_service.model = model
summarization_service.tokenizer = tokenizer
upload_unified_model(unified_model, tokenizer)
def train_translation_model():
global translation_service
model_name = "m2m100_418M"
tokenizer_name = "m2m100_418M"
model = global_models[model_name]
tokenizer = global_tokenizers[tokenizer_name]
while True:
check_resource_usage()
training_queue_path = "translation_training_queue.json"
if training_queue_path in RAM_STORAGE:
training_data_list = json.loads(RAM_STORAGE[training_queue_path])
if training_data_list:
training_data = training_data_list.pop(0)
RAM_STORAGE[training_queue_path] = json.dumps(training_data_list)
new_text = training_data.get("text", "")
target_language = training_data.get("target_language", "en")
if new_text:
inputs = tokenizer(
new_text, return_tensors="pt", truncation=True, max_length=512
).to("cpu")
model.to("cpu")
model.train()
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)
loss_fn = torch.nn.CrossEntropyLoss()
with tokenizer.as_target_tokenizer():
labels = tokenizer(
new_text,
return_tensors="pt",
truncation=True,
max_length=512,
).to("cpu")
for epoch in range(1):
outputs = model(**inputs, labels=labels["input_ids"])
loss = outputs.loss
optimizer.zero_grad()
loss.backward()
optimizer.step()
global_models[model_name] = model
translation_service.model = model
translation_service.tokenizer = tokenizer
upload_unified_model(unified_model, tokenizer)
def train_sentiment_analysis_model():
global sentiment_analysis_service
model_name = "distilbert-base-uncased-finetuned-sst-2-english"
tokenizer_name = "distilbert-base-uncased-finetuned-sst-2-english"
model = global_models[model_name]
tokenizer = global_tokenizers[tokenizer_name]
while True:
check_resource_usage()
training_queue_path = "sentiment_analysis_training_queue.json"
if training_queue_path in RAM_STORAGE:
training_data_list = json.loads(RAM_STORAGE[training_queue_path])
if training_data_list:
training_data = training_data_list.pop(0)
RAM_STORAGE[training_queue_path] = json.dumps(training_data_list)
new_text = training_data.get("text", "")
sentiment_label = training_data.get("sentiment", 1)
if new_text:
inputs = tokenizer(
new_text, return_tensors="pt", truncation=True, max_length=128
).to("cpu")
labels = torch.tensor([sentiment_label]).to("cpu")
model.to("cpu")
model.train()
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)
loss_fn = torch.nn.CrossEntropyLoss()
for epoch in range(1):
outputs = model(**inputs, labels=labels)
loss = outputs.loss
optimizer.zero_grad()
loss.backward()
optimizer.step()
global_models[model_name] = model
sentiment_analysis_service.model = model
sentiment_analysis_service.tokenizer = tokenizer
upload_unified_model(unified_model, tokenizer)
def train_image_editing_model():
global img2img_pipeline
model_name = "stable-diffusion-2-1-base"
model = global_models[model_name]
while True:
check_resource_usage()
training_queue_path = "image_editing_training_queue.json"
if training_queue_path in RAM_STORAGE:
training_data_list = json.loads(RAM_STORAGE[training_queue_path])
if training_data_list:
training_data = training_data_list.pop(0)
RAM_STORAGE[training_queue_path] = json.dumps(training_data_list)
image_url = training_data.get("image_url")
prompt = training_data.get("prompt")
if image_url and prompt:
image = Image.open(requests.get(image_url, stream=True).raw)
image = image.resize((512, 512))
model.to("cpu")
model.train()
optimizer = torch.optim.Adam(model.unet.parameters(), lr=5e-5)
loss_fn = torch.nn.MSELoss()
for epoch in range(1):
outputs = model(
prompt=prompt, image=image, strength=0.8, guidance_scale=7.5
)
loss = loss_fn(outputs.images[0], image)
optimizer.zero_grad()
loss.backward()
optimizer.step()
del image # Release memory
gc.collect()
global_models[model_name] = model
img2img_pipeline = model
img2img_pipeline.enable_model_cpu_offload()
upload_unified_model(unified_model, tokenizer)
def upload_unified_model(model, tokenizer):
"""Uploads the unified model and tokenizer to Hugging Face Hub."""
api = HfApi()
repo_id = "Yhhxhfh/test"
token = os.getenv("HF_API_TOKEN")
if token:
api.create_repo(repo_id=repo_id, token=token, exist_ok=True, private=True)
model.push_to_hub(repo_id, use_auth_token=token)
tokenizer.push_to_hub(repo_id, use_auth_token=token)
else:
print("Hugging Face API token not found. Skipping upload.")
def check_resource_usage():
"""Monitors and manages resource usage (RAM, CPU, GPU)."""
# RAM usage
ram_percent = psutil.virtual_memory().percent
if ram_percent > 90: # If RAM usage exceeds 90%
print("WARNING: High RAM usage detected. Clearing cache...")
torch.cuda.empty_cache() # Clear GPU cache
gc.collect() # Force garbage collection
if ram_percent > 95: # If RAM usage still exceeds 95%
print("CRITICAL: Extremely high RAM usage. Terminating processes...")
os.kill(os.getpid(), 9) # Terminate the script
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
if cpu_percent > 90:
print("WARNING: High CPU usage detected. Reducing workload...")
# Implement strategies to reduce CPU load, such as:
# - Decreasing batch size in training loops
# - Skipping a training step
# - Limiting the number of parallel processes
# GPU usage (if applicable)
if torch.cuda.is_available():
gpu_memory = torch.cuda.memory_allocated() / 1024**2 # In MB
if gpu_memory > torch.cuda.get_device_properties(0).total_memory * 0.9:
print("WARNING: High GPU memory usage detected. Clearing cache...")
torch.cuda.empty_cache()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)