|
import os |
|
import json |
|
import torch |
|
from transformers import ( |
|
AutoTokenizer, |
|
AutoModelForSequenceClassification, |
|
AutoModelForCausalLM, |
|
TrainingArguments, |
|
Trainer, |
|
AutoModelForTextToWaveform, |
|
GPT2LMHeadModel, |
|
GPT2Tokenizer, |
|
pipeline, |
|
AutoModelForSeq2SeqLM, |
|
AutoFeatureExtractor, |
|
AutoModelForAudioClassification, |
|
) |
|
from diffusers import StableDiffusionPipeline, StableDiffusionImg2ImgPipeline |
|
from fastapi import FastAPI, HTTPException, Request |
|
from fastapi.responses import HTMLResponse, FileResponse |
|
import multiprocessing |
|
import uuid |
|
import numpy as np |
|
from PIL import Image |
|
import requests |
|
from dotenv import load_dotenv |
|
from huggingface_hub import HfApi, HfFolder |
|
import psutil |
|
import gc |
|
|
|
load_dotenv() |
|
|
|
app = FastAPI() |
|
|
|
default_language = "es" |
|
|
|
RAM_STORAGE = {} |
|
|
|
gpt2_variants = [ |
|
"gpt2", |
|
"gpt2-medium", |
|
"gpt2-large", |
|
"gpt2-xl" |
|
] |
|
|
|
global_models = {} |
|
global_tokenizers = {} |
|
|
|
|
|
def download_model(model_class, pretrained_name): |
|
while True: |
|
try: |
|
model = model_class.from_pretrained( |
|
pretrained_name, local_files_only=False, resume_download=True |
|
) |
|
return model |
|
except Exception as e: |
|
print(f"Error downloading {pretrained_name}: {e}") |
|
print("Retrying download...") |
|
try: |
|
model = model_class.from_pretrained( |
|
pretrained_name, |
|
local_files_only=False, |
|
resume_download=True, |
|
force_download=True, |
|
) |
|
return model |
|
except Exception as e: |
|
print(f"Failed to download after retry: {e}") |
|
return None |
|
|
|
|
|
for variant in gpt2_variants: |
|
global_tokenizers[variant] = download_model(AutoTokenizer, variant) |
|
global_models[variant] = download_model(AutoModelForCausalLM, variant) |
|
|
|
global_models["musicgen-small"] = download_model( |
|
AutoModelForTextToWaveform, "facebook/musicgen-small" |
|
) |
|
global_tokenizers["musicgen-small"] = download_model( |
|
AutoTokenizer, "facebook/musicgen-small" |
|
) |
|
global_models["stable-diffusion-v1-4"] = download_model( |
|
StableDiffusionPipeline, "CompVis/stable-diffusion-v1-4" |
|
) |
|
global_models["stable-diffusion-2-1"] = download_model( |
|
StableDiffusionPipeline, "stabilityai/stable-diffusion-2-1" |
|
) |
|
global_tokenizers["codegen-350M-mono"] = download_model( |
|
AutoTokenizer, "Salesforce/codegen-350M-mono" |
|
) |
|
global_models["codegen-350M-mono"] = download_model( |
|
AutoModelForCausalLM, "Salesforce/codegen-350M-mono" |
|
) |
|
global_tokenizers["bart-large-cnn"] = download_model( |
|
AutoTokenizer, "facebook/bart-large-cnn" |
|
) |
|
global_models["bart-large-cnn"] = download_model( |
|
AutoModelForSeq2SeqLM, "facebook/bart-large-cnn" |
|
) |
|
global_tokenizers["m2m100_418M"] = download_model( |
|
AutoTokenizer, "facebook/m2m100_418M" |
|
) |
|
global_models["m2m100_418M"] = download_model( |
|
AutoModelForSeq2SeqLM, "facebook/m2m100_418M" |
|
) |
|
global_tokenizers["wav2vec2-base"] = download_model( |
|
AutoFeatureExtractor, "facebook/wav2vec2-base" |
|
) |
|
global_models["wav2vec2-base"] = download_model( |
|
AutoModelForAudioClassification, "facebook/wav2vec2-base" |
|
) |
|
global_tokenizers[ |
|
"distilbert-base-uncased-finetuned-sst-2-english" |
|
] = download_model(AutoTokenizer, "distilbert-base-uncased-finetuned-sst-2-english") |
|
global_models["distilbert-base-uncased-finetuned-sst-2-english"] = download_model( |
|
AutoModelForSequenceClassification, |
|
"distilbert-base-uncased-finetuned-sst-2-english", |
|
) |
|
global_models["stable-diffusion-2-1-base"] = download_model( |
|
StableDiffusionImg2ImgPipeline, "stabilityai/stable-diffusion-2-1-base" |
|
) |
|
|
|
|
|
class ChatbotService: |
|
def __init__(self, model_name="response_model", tokenizer_name="response_tokenizer"): |
|
self.model_name = model_name |
|
self.tokenizer_name = tokenizer_name |
|
self.model = self.load_model() |
|
self.tokenizer = self.load_tokenizer() |
|
|
|
def get_response(self, user_id, message, language=default_language): |
|
if self.model is None or self.tokenizer is None: |
|
return "El modelo aún no está listo. Por favor, inténtelo de nuevo más tarde." |
|
input_text = f"Usuario: {message} Asistente:" |
|
input_ids = self.tokenizer.encode(input_text, return_tensors="pt").to("cpu") |
|
with torch.no_grad(): |
|
output = self.model.generate( |
|
input_ids=input_ids, |
|
max_length=100, |
|
num_beams=5, |
|
no_repeat_ngram_size=2, |
|
early_stopping=True, |
|
) |
|
response = self.tokenizer.decode(output[0], skip_special_tokens=True) |
|
response = response.replace(input_text, "").strip() |
|
return response |
|
|
|
def load_model(self): |
|
if self.model_name in global_models: |
|
return global_models[self.model_name] |
|
return None |
|
|
|
def load_tokenizer(self): |
|
if self.tokenizer_name in global_tokenizers: |
|
return global_tokenizers[self.tokenizer_name] |
|
return None |
|
|
|
def save_model(self): |
|
global_models[self.model_name] = self.model |
|
|
|
def save_tokenizer(self): |
|
global_tokenizers[self.tokenizer_name] = self.tokenizer |
|
|
|
|
|
chatbot_services = {} |
|
for variant in gpt2_variants: |
|
chatbot_services[variant] = ChatbotService(model_name=variant, tokenizer_name=variant) |
|
|
|
programming_service = ChatbotService( |
|
model_name="codegen-350M-mono", tokenizer_name="codegen-350M-mono" |
|
) |
|
summarization_service = ChatbotService( |
|
model_name="bart-large-cnn", tokenizer_name="bart-large-cnn" |
|
) |
|
translation_service = ChatbotService( |
|
model_name="m2m100_418M", tokenizer_name="m2m100_418M" |
|
) |
|
sentiment_analysis_service = ChatbotService( |
|
model_name="distilbert-base-uncased-finetuned-sst-2-english", |
|
tokenizer_name="distilbert-base-uncased-finetuned-sst-2-english", |
|
) |
|
|
|
|
|
class UnifiedModel(AutoModelForSequenceClassification): |
|
def __init__(self, config): |
|
super().__init__(config) |
|
|
|
@staticmethod |
|
def load_model(): |
|
model_name = "unified_model" |
|
if model_name in RAM_STORAGE: |
|
return RAM_STORAGE[model_name] |
|
else: |
|
model = UnifiedModel.from_pretrained("gpt2", num_labels=3) |
|
RAM_STORAGE[model_name] = model |
|
return model |
|
|
|
|
|
class SyntheticDataset(torch.utils.data.Dataset): |
|
def __init__(self, tokenizer, data): |
|
self.tokenizer = tokenizer |
|
self.data = data |
|
|
|
def __len__(self): |
|
return len(self.data) |
|
|
|
def __getitem__(self, idx): |
|
item = self.data[idx] |
|
text = item["text"] |
|
label = item["label"] |
|
tokens = self.tokenizer( |
|
text, padding="max_length", truncation=True, max_length=128, return_tensors="pt" |
|
) |
|
return { |
|
"input_ids": tokens["input_ids"].squeeze(), |
|
"attention_mask": tokens["attention_mask"].squeeze(), |
|
"labels": label, |
|
} |
|
|
|
|
|
conversation_history = {} |
|
|
|
tokenizer_name = "unified_tokenizer" |
|
tokenizer = AutoTokenizer.from_pretrained("gpt2") |
|
tokenizer.pad_token = tokenizer.eos_token |
|
unified_model = UnifiedModel.load_model() |
|
unified_model.to(torch.device("cpu")) |
|
musicgen_tokenizer = global_tokenizers["musicgen-small"] |
|
musicgen_model = global_models["musicgen-small"] |
|
stable_diffusion_pipeline = global_models["stable-diffusion-v1-4"] |
|
|
|
img2img_pipeline = global_models["stable-diffusion-2-1-base"] |
|
img2img_pipeline.enable_model_cpu_offload() |
|
|
|
|
|
@app.on_event("startup") |
|
async def startup_event(): |
|
auto_learn_process = multiprocessing.Process(target=train_unified_model) |
|
auto_learn_process.start() |
|
music_training_process = multiprocessing.Process(target=auto_learn_music) |
|
music_training_process.start() |
|
image_training_process = multiprocessing.Process(target=auto_learn_images) |
|
image_training_process.start() |
|
programming_training_process = multiprocessing.Process(target=train_programming_model) |
|
programming_training_process.start() |
|
summarization_process = multiprocessing.Process(target=train_summarization_model) |
|
summarization_process.start() |
|
translation_process = multiprocessing.Process(target=train_translation_model) |
|
translation_process.start() |
|
sentiment_analysis_process = multiprocessing.Process( |
|
target=train_sentiment_analysis_model |
|
) |
|
sentiment_analysis_process.start() |
|
image_editing_process = multiprocessing.Process(target=train_image_editing_model) |
|
image_editing_process.start() |
|
|
|
|
|
@app.post("/generate_image") |
|
async def generate_image(request: Request): |
|
data = await request.json() |
|
prompt = data.get("prompt") |
|
if not prompt: |
|
raise HTTPException(status_code=400, detail="Missing 'prompt' in request body.") |
|
image = stable_diffusion_pipeline(prompt).images[0] |
|
image_path = "generated_image.png" |
|
image.save(image_path) |
|
RAM_STORAGE[image_path] = image.tobytes() |
|
del image |
|
gc.collect() |
|
return FileResponse(image_path, media_type="image/png") |
|
|
|
|
|
@app.post("/edit_image") |
|
async def edit_image(request: Request): |
|
data = await request.json() |
|
image_url = data.get("image_url") |
|
prompt = data.get("prompt") |
|
if not image_url or not prompt: |
|
raise HTTPException( |
|
status_code=400, detail="Missing 'image_url' or 'prompt' in request body." |
|
) |
|
image = Image.open(requests.get(image_url, stream=True).raw) |
|
edited_image = img2img_pipeline(prompt=prompt, image=image).images[0] |
|
edited_image_path = "edited_image.png" |
|
edited_image.save(edited_image_path) |
|
RAM_STORAGE[edited_image_path] = edited_image.tobytes() |
|
del image, edited_image |
|
gc.collect() |
|
return FileResponse(edited_image_path, media_type="image/png") |
|
|
|
|
|
@app.post("/process") |
|
async def process(request: Request): |
|
global tokenizer, unified_model, chatbot_services |
|
data = await request.json() |
|
if data.get("train"): |
|
user_data = data.get("user_data", []) |
|
if not user_data: |
|
user_data = [ |
|
{"text": "Hola", "label": 1}, |
|
{"text": "Necesito ayuda", "label": 2}, |
|
{"text": "No entiendo", "label": 0}, |
|
] |
|
training_queue_path = "training_queue.json" |
|
if training_queue_path in RAM_STORAGE: |
|
existing_data = json.loads(RAM_STORAGE[training_queue_path]) |
|
else: |
|
existing_data = [] |
|
RAM_STORAGE[training_queue_path] = json.dumps( |
|
existing_data |
|
+ [{"tokenizers": {tokenizer_name: tokenizer.get_vocab()}, "data": user_data}] |
|
) |
|
return {"message": "Training data received. Model will be updated asynchronously."} |
|
elif data.get("message"): |
|
user_id = data.get("user_id") |
|
text = data["message"] |
|
language = data.get("language", default_language) |
|
model_variant = data.get("model_variant", "gpt2") |
|
if user_id not in conversation_history: |
|
conversation_history[user_id] = [] |
|
conversation_history[user_id].append(text) |
|
contextualized_text = " ".join(conversation_history[user_id][-3:]) |
|
if model_variant in chatbot_services: |
|
response = chatbot_services[model_variant].get_response( |
|
user_id, contextualized_text, language |
|
) |
|
elif model_variant == "programming": |
|
response = programming_service.get_response( |
|
user_id, contextualized_text, language |
|
) |
|
elif model_variant == "summarization": |
|
response = summarization_service.get_response( |
|
user_id, contextualized_text, language |
|
) |
|
elif model_variant == "translation": |
|
response = translation_service.get_response( |
|
user_id, contextualized_text, language |
|
) |
|
elif model_variant == "sentiment_analysis": |
|
response = sentiment_analysis_service.get_response( |
|
user_id, contextualized_text, language |
|
) |
|
else: |
|
response = chatbot_services["gpt2"].get_response( |
|
user_id, contextualized_text, language |
|
) |
|
training_queue_path = "training_queue.json" |
|
if training_queue_path in RAM_STORAGE: |
|
existing_data = json.loads(RAM_STORAGE[training_queue_path]) |
|
else: |
|
existing_data = [] |
|
RAM_STORAGE[training_queue_path] = json.dumps( |
|
existing_data |
|
+ [ |
|
{ |
|
"tokenizers": {tokenizer_name: tokenizer.get_vocab()}, |
|
"data": [{"text": contextualized_text, "label": 0}], |
|
} |
|
] |
|
) |
|
return {"answer": response} |
|
else: |
|
raise HTTPException(status_code=400, detail="Request must contain 'train' or 'message'.") |
|
|
|
|
|
@app.get("/") |
|
async def get_home(): |
|
user_id = str(uuid.uuid4()) |
|
html_code = f""" |
|
<!DOCTYPE html> |
|
<html> |
|
<head> |
|
<meta charset="UTF-8"> |
|
<title>Chatbot</title> |
|
<style> |
|
body {{font-family: 'Arial', sans-serif;background-color: #f4f4f9;margin: 0;padding: 0;display: flex;align-items: center;justify-content: center;min-height: 100vh;}} |
|
.container {{background-color: #fff;border-radius: 10px;box-shadow: 0 2px 5px rgba(0, 0, 0, 0.1);overflow: hidden;width: 400px;max-width: 90%;}} |
|
h1 {{color: #333;text-align: center;padding: 20px;margin: 0;background-color: #f8f9fa;border-bottom: 1px solid #eee;}} |
|
#chatbox {{height: 300px;overflow-y: auto;padding: 10px;border-bottom: 1px solid #eee;}} |
|
.message {{margin-bottom: 10px;padding: 10px;border-radius: 5px;}} |
|
.message.user {{background-color: #e1f5fe;text-align: right;}} |
|
.message.bot {{background-color: #f1f1f1;text-align: left;}} |
|
#input {{display: flex;padding: 10px;}} |
|
#input textarea {{flex: 1;padding: 10px;border: 1px solid #ddd;border-radius: 4px;margin-right: 10px;}} |
|
#input button {{padding: 10px 20px;border: none;border-radius: 4px;background-color: #007bff;color: #fff;cursor: pointer;}} |
|
#input button:hover {{background-color: #0056b3;}} |
|
</style> |
|
</head> |
|
<body> |
|
<div class="container"> |
|
<h1>Chatbot</h1> |
|
<div id="chatbox"></div> |
|
<div id="input"> |
|
<textarea id="message" rows="3" placeholder="Escribe tu mensaje aquí..."></textarea> |
|
<button id="send">Enviar</button> |
|
</div> |
|
</div> |
|
<script> |
|
const chatbox = document.getElementById('chatbox'); |
|
const messageInput = document.getElementById('message'); |
|
const sendButton = document.getElementById('send'); |
|
function appendMessage(text, sender) {{ |
|
const messageDiv = document.createElement('div'); |
|
messageDiv.classList.add('message', sender); |
|
messageDiv.textContent = text; |
|
chatbox.appendChild(messageDiv); |
|
chatbox.scrollTop = chatbox.scrollHeight; |
|
}} |
|
async function sendMessage() {{ |
|
const message = messageInput.value; |
|
if (!message.trim()) return; |
|
appendMessage(message, 'user'); |
|
messageInput.value = ''; |
|
const response = await fetch('/process', {{ |
|
method: 'POST', |
|
headers: {{'Content-Type': 'application/json'}}, |
|
body: JSON.stringify({{message: message, user_id: '{user_id}'}}) |
|
}}); |
|
const data = await response.json(); |
|
appendMessage(data.answer, 'bot'); |
|
}} |
|
sendButton.addEventListener('click', sendMessage); |
|
messageInput.addEventListener('keypress', (e) => {{ |
|
if (e.key === 'Enter' && !e.shiftKey) {{ |
|
e.preventDefault(); |
|
sendMessage(); |
|
}} |
|
}}); |
|
</script> |
|
</body> |
|
</html> |
|
""" |
|
return HTMLResponse(content=html_code) |
|
|
|
|
|
def train_unified_model(): |
|
global tokenizer, unified_model |
|
model_name = "unified_model" |
|
training_args = TrainingArguments( |
|
output_dir="./results", per_device_train_batch_size=8, num_train_epochs=3 |
|
) |
|
while True: |
|
check_resource_usage() |
|
training_queue_path = "training_queue.json" |
|
if training_queue_path in RAM_STORAGE: |
|
training_data_list = json.loads(RAM_STORAGE[training_queue_path]) |
|
if training_data_list: |
|
training_data = training_data_list.pop(0) |
|
RAM_STORAGE[training_queue_path] = json.dumps(training_data_list) |
|
tokenizer_data = training_data.get("tokenizers") |
|
if tokenizer_data: |
|
tokenizer_name = list(tokenizer_data.keys())[0] |
|
existing_tokens = tokenizer.get_vocab() |
|
new_tokens = tokenizer_data[tokenizer_name] |
|
for token, id in new_tokens.items(): |
|
if token not in existing_tokens: |
|
tokenizer.add_tokens([token]) |
|
data = training_data.get("data", []) |
|
if data: |
|
dataset = SyntheticDataset(tokenizer, data) |
|
trainer = Trainer( |
|
model=unified_model, args=training_args, train_dataset=dataset |
|
) |
|
trainer.train() |
|
RAM_STORAGE[model_name] = unified_model.state_dict() |
|
upload_unified_model(unified_model, tokenizer) |
|
|
|
initial_data_path = "initial_data.json" |
|
if initial_data_path in RAM_STORAGE: |
|
initial_data = json.loads(RAM_STORAGE[initial_data_path]) |
|
dataset = SyntheticDataset(tokenizer, initial_data) |
|
trainer = Trainer( |
|
model=unified_model, args=training_args, train_dataset=dataset |
|
) |
|
trainer.train() |
|
RAM_STORAGE[model_name] = unified_model.state_dict() |
|
upload_unified_model(unified_model, tokenizer) |
|
|
|
|
|
def auto_learn_music(): |
|
global musicgen_tokenizer, musicgen_model |
|
while True: |
|
check_resource_usage() |
|
music_training_queue_path = "music_training_queue.json" |
|
if music_training_queue_path in RAM_STORAGE: |
|
music_training_data_list = json.loads(RAM_STORAGE[music_training_queue_path]) |
|
if music_training_data_list: |
|
music_training_data = music_training_data_list.pop(0) |
|
RAM_STORAGE[music_training_queue_path] = json.dumps( |
|
music_training_data_list |
|
) |
|
inputs = musicgen_tokenizer( |
|
music_training_data, return_tensors="pt", padding=True |
|
).to("cpu") |
|
musicgen_model.to("cpu") |
|
musicgen_model.train() |
|
optimizer = torch.optim.Adam(musicgen_model.parameters(), lr=5e-5) |
|
loss_fn = torch.nn.CrossEntropyLoss() |
|
for epoch in range(1): |
|
outputs = musicgen_model(**inputs) |
|
loss = loss_fn(outputs.logits, inputs["labels"]) |
|
optimizer.zero_grad() |
|
loss.backward() |
|
optimizer.step() |
|
global_models["musicgen-small"] = musicgen_model |
|
upload_unified_model(unified_model, tokenizer) |
|
|
|
|
|
def auto_learn_images(): |
|
global stable_diffusion_pipeline |
|
while True: |
|
check_resource_usage() |
|
image_training_queue_path = "image_training_queue.json" |
|
if image_training_queue_path in RAM_STORAGE: |
|
image_training_data_list = json.loads(RAM_STORAGE[image_training_queue_path]) |
|
if image_training_data_list: |
|
image_training_data = image_training_data_list.pop(0) |
|
RAM_STORAGE[image_training_queue_path] = json.dumps( |
|
image_training_data_list |
|
) |
|
for image_prompt in image_training_data: |
|
image = stable_diffusion_pipeline( |
|
image_prompt, |
|
guidance_scale=0.0, |
|
num_inference_steps=4, |
|
max_sequence_length=256, |
|
generator=torch.Generator("cpu").manual_seed(0), |
|
).images[0] |
|
image_tensor = torch.tensor(np.array(image)).unsqueeze(0).to("cpu") |
|
stable_diffusion_pipeline.unet.to("cpu") |
|
stable_diffusion_pipeline.unet.train() |
|
optimizer = torch.optim.Adam( |
|
stable_diffusion_pipeline.unet.parameters(), lr=1e-5 |
|
) |
|
loss_fn = torch.nn.MSELoss() |
|
target_tensor = torch.zeros_like(image_tensor) |
|
for epoch in range(1): |
|
outputs = stable_diffusion_pipeline.unet(image_tensor) |
|
loss = loss_fn(outputs, target_tensor) |
|
optimizer.zero_grad() |
|
loss.backward() |
|
optimizer.step() |
|
del image, image_tensor, target_tensor |
|
gc.collect() |
|
global_models["stable-diffusion-v1-4"] = stable_diffusion_pipeline |
|
upload_unified_model(unified_model, tokenizer) |
|
|
|
|
|
def train_programming_model(): |
|
global programming_service |
|
model_name = "codegen-350M-mono" |
|
tokenizer_name = "codegen-350M-mono" |
|
model = global_models[model_name] |
|
tokenizer = global_tokenizers[tokenizer_name] |
|
while True: |
|
check_resource_usage() |
|
training_queue_path = "programming_training_queue.json" |
|
if training_queue_path in RAM_STORAGE: |
|
training_data_list = json.loads(RAM_STORAGE[training_queue_path]) |
|
if training_data_list: |
|
training_data = training_data_list.pop(0) |
|
RAM_STORAGE[training_queue_path] = json.dumps(training_data_list) |
|
new_code = training_data.get("code", "") |
|
if new_code: |
|
inputs = tokenizer(new_code, return_tensors="pt").to("cpu") |
|
model.to("cpu") |
|
model.train() |
|
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5) |
|
loss_fn = torch.nn.CrossEntropyLoss() |
|
for epoch in range(1): |
|
outputs = model(**inputs, labels=inputs["input_ids"]) |
|
loss = outputs.loss |
|
optimizer.zero_grad() |
|
loss.backward() |
|
optimizer.step() |
|
global_models[model_name] = model |
|
programming_service.model = model |
|
programming_service.tokenizer = tokenizer |
|
upload_unified_model(unified_model, tokenizer) |
|
|
|
|
|
def train_summarization_model(): |
|
global summarization_service |
|
model_name = "bart-large-cnn" |
|
tokenizer_name = "bart-large-cnn" |
|
model = global_models[model_name] |
|
tokenizer = global_tokenizers[tokenizer_name] |
|
while True: |
|
check_resource_usage() |
|
training_queue_path = "summarization_training_queue.json" |
|
if training_queue_path in RAM_STORAGE: |
|
training_data_list = json.loads(RAM_STORAGE[training_queue_path]) |
|
if training_data_list: |
|
training_data = training_data_list.pop(0) |
|
RAM_STORAGE[training_queue_path] = json.dumps(training_data_list) |
|
new_text = training_data.get("text", "") |
|
new_summary = training_data.get("summary", "") |
|
if new_text and new_summary: |
|
inputs = tokenizer( |
|
new_text, return_tensors="pt", truncation=True, max_length=512 |
|
).to("cpu") |
|
labels = tokenizer( |
|
new_summary, return_tensors="pt", truncation=True, max_length=128 |
|
).to("cpu") |
|
model.to("cpu") |
|
model.train() |
|
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5) |
|
loss_fn = torch.nn.CrossEntropyLoss() |
|
for epoch in range(1): |
|
outputs = model(**inputs, labels=labels["input_ids"]) |
|
loss = outputs.loss |
|
optimizer.zero_grad() |
|
loss.backward() |
|
optimizer.step() |
|
global_models[model_name] = model |
|
summarization_service.model = model |
|
summarization_service.tokenizer = tokenizer |
|
upload_unified_model(unified_model, tokenizer) |
|
|
|
|
|
def train_translation_model(): |
|
global translation_service |
|
model_name = "m2m100_418M" |
|
tokenizer_name = "m2m100_418M" |
|
model = global_models[model_name] |
|
tokenizer = global_tokenizers[tokenizer_name] |
|
while True: |
|
check_resource_usage() |
|
training_queue_path = "translation_training_queue.json" |
|
if training_queue_path in RAM_STORAGE: |
|
training_data_list = json.loads(RAM_STORAGE[training_queue_path]) |
|
if training_data_list: |
|
training_data = training_data_list.pop(0) |
|
RAM_STORAGE[training_queue_path] = json.dumps(training_data_list) |
|
new_text = training_data.get("text", "") |
|
target_language = training_data.get("target_language", "en") |
|
if new_text: |
|
inputs = tokenizer( |
|
new_text, return_tensors="pt", truncation=True, max_length=512 |
|
).to("cpu") |
|
model.to("cpu") |
|
model.train() |
|
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5) |
|
loss_fn = torch.nn.CrossEntropyLoss() |
|
with tokenizer.as_target_tokenizer(): |
|
labels = tokenizer( |
|
new_text, |
|
return_tensors="pt", |
|
truncation=True, |
|
max_length=512, |
|
).to("cpu") |
|
for epoch in range(1): |
|
outputs = model(**inputs, labels=labels["input_ids"]) |
|
loss = outputs.loss |
|
optimizer.zero_grad() |
|
loss.backward() |
|
optimizer.step() |
|
global_models[model_name] = model |
|
translation_service.model = model |
|
translation_service.tokenizer = tokenizer |
|
upload_unified_model(unified_model, tokenizer) |
|
|
|
|
|
def train_sentiment_analysis_model(): |
|
global sentiment_analysis_service |
|
model_name = "distilbert-base-uncased-finetuned-sst-2-english" |
|
tokenizer_name = "distilbert-base-uncased-finetuned-sst-2-english" |
|
model = global_models[model_name] |
|
tokenizer = global_tokenizers[tokenizer_name] |
|
while True: |
|
check_resource_usage() |
|
training_queue_path = "sentiment_analysis_training_queue.json" |
|
if training_queue_path in RAM_STORAGE: |
|
training_data_list = json.loads(RAM_STORAGE[training_queue_path]) |
|
if training_data_list: |
|
training_data = training_data_list.pop(0) |
|
RAM_STORAGE[training_queue_path] = json.dumps(training_data_list) |
|
new_text = training_data.get("text", "") |
|
sentiment_label = training_data.get("sentiment", 1) |
|
if new_text: |
|
inputs = tokenizer( |
|
new_text, return_tensors="pt", truncation=True, max_length=128 |
|
).to("cpu") |
|
labels = torch.tensor([sentiment_label]).to("cpu") |
|
model.to("cpu") |
|
model.train() |
|
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5) |
|
loss_fn = torch.nn.CrossEntropyLoss() |
|
for epoch in range(1): |
|
outputs = model(**inputs, labels=labels) |
|
loss = outputs.loss |
|
optimizer.zero_grad() |
|
loss.backward() |
|
optimizer.step() |
|
global_models[model_name] = model |
|
sentiment_analysis_service.model = model |
|
sentiment_analysis_service.tokenizer = tokenizer |
|
upload_unified_model(unified_model, tokenizer) |
|
|
|
|
|
def train_image_editing_model(): |
|
global img2img_pipeline |
|
model_name = "stable-diffusion-2-1-base" |
|
model = global_models[model_name] |
|
while True: |
|
check_resource_usage() |
|
training_queue_path = "image_editing_training_queue.json" |
|
if training_queue_path in RAM_STORAGE: |
|
training_data_list = json.loads(RAM_STORAGE[training_queue_path]) |
|
if training_data_list: |
|
training_data = training_data_list.pop(0) |
|
RAM_STORAGE[training_queue_path] = json.dumps(training_data_list) |
|
image_url = training_data.get("image_url") |
|
prompt = training_data.get("prompt") |
|
if image_url and prompt: |
|
image = Image.open(requests.get(image_url, stream=True).raw) |
|
image = image.resize((512, 512)) |
|
model.to("cpu") |
|
model.train() |
|
optimizer = torch.optim.Adam(model.unet.parameters(), lr=5e-5) |
|
loss_fn = torch.nn.MSELoss() |
|
for epoch in range(1): |
|
outputs = model( |
|
prompt=prompt, image=image, strength=0.8, guidance_scale=7.5 |
|
) |
|
loss = loss_fn(outputs.images[0], image) |
|
optimizer.zero_grad() |
|
loss.backward() |
|
optimizer.step() |
|
del image |
|
gc.collect() |
|
global_models[model_name] = model |
|
img2img_pipeline = model |
|
img2img_pipeline.enable_model_cpu_offload() |
|
upload_unified_model(unified_model, tokenizer) |
|
|
|
|
|
def upload_unified_model(model, tokenizer): |
|
"""Uploads the unified model and tokenizer to Hugging Face Hub.""" |
|
api = HfApi() |
|
repo_id = "Yhhxhfh/test" |
|
token = os.getenv("HF_API_TOKEN") |
|
if token: |
|
api.create_repo(repo_id=repo_id, token=token, exist_ok=True, private=True) |
|
model.push_to_hub(repo_id, use_auth_token=token) |
|
tokenizer.push_to_hub(repo_id, use_auth_token=token) |
|
else: |
|
print("Hugging Face API token not found. Skipping upload.") |
|
|
|
def check_resource_usage(): |
|
"""Monitors and manages resource usage (RAM, CPU, GPU).""" |
|
|
|
ram_percent = psutil.virtual_memory().percent |
|
if ram_percent > 90: |
|
print("WARNING: High RAM usage detected. Clearing cache...") |
|
torch.cuda.empty_cache() |
|
gc.collect() |
|
if ram_percent > 95: |
|
print("CRITICAL: Extremely high RAM usage. Terminating processes...") |
|
os.kill(os.getpid(), 9) |
|
|
|
|
|
cpu_percent = psutil.cpu_percent(interval=1) |
|
if cpu_percent > 90: |
|
print("WARNING: High CPU usage detected. Reducing workload...") |
|
|
|
|
|
|
|
|
|
|
|
|
|
if torch.cuda.is_available(): |
|
gpu_memory = torch.cuda.memory_allocated() / 1024**2 |
|
if gpu_memory > torch.cuda.get_device_properties(0).total_memory * 0.9: |
|
print("WARNING: High GPU memory usage detected. Clearing cache...") |
|
torch.cuda.empty_cache() |
|
|
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
|
|
uvicorn.run(app, host="0.0.0.0", port=7860) |