Hhhhvasasasaggc / app.py
xfcxcxcdfdfd's picture
Update app.py
dcd7efc verified
import cachetools
from pydantic import BaseModel
from llama_cpp import Llama
from concurrent.futures import ThreadPoolExecutor, as_completed
import re
import httpx
import asyncio
import gradio as gr
import os
from dotenv import load_dotenv
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import uvicorn
from threading import Thread
import gptcache
load_dotenv()
HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN")
cache = cachetools.TTLCache(maxsize=100, ttl=60)
global_data = {
'models': {},
'tokens': {
'eos': 'eos_token',
'pad': 'pad_token',
'padding': 'padding_token',
'unk': 'unk_token',
'bos': 'bos_token',
'sep': 'sep_token',
'cls': 'cls_token',
'mask': 'mask_token'
},
'model_metadata': {},
'max_tokens': {},
'tokenizers': {},
'model_params': {},
'model_size': {},
'model_ftype': {},
'n_ctx_train': {},
'n_embd': {},
'n_layer': {},
'n_head': {},
'n_head_kv': {},
'n_rot': {},
'n_swa': {},
'n_embd_head_k': {},
'n_embd_head_v': {},
'n_gqa': {},
'n_embd_k_gqa': {},
'n_embd_v_gqa': {},
'f_norm_eps': {},
'f_norm_rms_eps': {},
'f_clamp_kqv': {},
'f_max_alibi_bias': {},
'f_logit_scale': {},
'n_ff': {},
'n_expert': {},
'n_expert_used': {},
'causal_attn': {},
'pooling_type': {},
'rope_type': {},
'rope_scaling': {},
'freq_base_train': {},
'freq_scale_train': {},
'n_ctx_orig_yarn': {},
'rope_finetuned': {},
'ssm_d_conv': {},
'ssm_d_inner': {},
'ssm_d_state': {},
'ssm_dt_rank': {},
'ssm_dt_b_c_rms': {},
'vocab_type': {},
'model_type': {}
}
model_configs = [
{
"repo_id": "bartowski/Llama-3.3-70B-Instruct-GGUF",
"filename": "Llama-3.3-70B-Instruct-Q2_K.gguf",
"name": "testing"
}
]
class ModelManager:
def __init__(self):
self.models = {}
def load_model(self, model_config):
if model_config['name'] not in self.models:
try:
self.models[model_config['name']] = Llama.from_pretrained(
repo_id=model_config['repo_id'],
filename=model_config['filename'],
use_auth_token=HUGGINGFACE_TOKEN,
n_threads=8,
use_gpu=False
)
except Exception as e:
pass
def load_all_models(self):
with ThreadPoolExecutor() as executor:
for config in model_configs:
executor.submit(self.load_model, config)
return self.models
model_manager = ModelManager()
global_data['models'] = model_manager.load_all_models()
class ChatRequest(BaseModel):
message: str
def normalize_input(input_text):
return input_text.strip()
def remove_duplicates(text):
lines = text.split('\n')
unique_lines = []
seen_lines = set()
for line in lines:
if line not in seen_lines:
unique_lines.append(line)
seen_lines.add(line)
return '\n'.join(unique_lines)
def cache_response(func):
def wrapper(*args, **kwargs):
cache_key = f"{args}-{kwargs}"
if cache_key in cache:
return cache[cache_key]
response = func(*args, **kwargs)
cache[cache_key] = response
return response
return wrapper
@cache_response
def generate_model_response(model, inputs):
try:
response = model(inputs)
return remove_duplicates(response['choices'][0]['text'])
except Exception as e:
return ""
def remove_repetitive_responses(responses):
unique_responses = {}
for response in responses:
if response['model'] not in unique_responses:
unique_responses[response['model']] = response['response']
return unique_responses
async def process_message(message):
inputs = normalize_input(message)
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(generate_model_response, model, inputs)
for model in global_data['models'].values()
]
responses = [
{'model': model_name, 'response': future.result()}
for model_name, future in zip(global_data['models'].keys(), as_completed(futures))
]
unique_responses = remove_repetitive_responses(responses)
formatted_response = ""
for model, response in unique_responses.items():
formatted_response += f"**{model}:**\n{response}\n\n"
return formatted_response
app = FastAPI()
@app.post("/generate")
async def generate(request: ChatRequest):
try:
response = await process_message(request.message)
return JSONResponse(content={"response": response})
except Exception as e:
return JSONResponse(content={"error": str(e)})
def run_uvicorn():
try:
uvicorn.run(app, host="0.0.0.0", port=7860)
except Exception as e:
print(f"Error al ejecutar uvicorn: {e}")
iface = gr.Interface(
fn=process_message,
inputs=gr.Textbox(lines=2, placeholder="Enter your message here..."),
outputs=gr.Markdown(),
title="Multi-Model LLM API (CPU Optimized)",
description=""
)
def run_gradio():
iface.launch(server_port=7862, prevent_thread_lock=True)
if __name__ == "__main__":
Thread(target=run_uvicorn).start()
Thread(target=run_gradio).start()
asyncio.get_event_loop().run_forever()