Spaces:
Running
Running
import cachetools | |
from pydantic import BaseModel | |
from llama_cpp import Llama | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import re | |
import httpx | |
import asyncio | |
import gradio as gr | |
import os | |
from dotenv import load_dotenv | |
from fastapi import FastAPI, Request | |
from fastapi.responses import JSONResponse | |
import uvicorn | |
from threading import Thread | |
import gptcache | |
load_dotenv() | |
HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN") | |
cache = cachetools.TTLCache(maxsize=100, ttl=60) | |
global_data = { | |
'models': {}, | |
'tokens': { | |
'eos': 'eos_token', | |
'pad': 'pad_token', | |
'padding': 'padding_token', | |
'unk': 'unk_token', | |
'bos': 'bos_token', | |
'sep': 'sep_token', | |
'cls': 'cls_token', | |
'mask': 'mask_token' | |
}, | |
'model_metadata': {}, | |
'max_tokens': {}, | |
'tokenizers': {}, | |
'model_params': {}, | |
'model_size': {}, | |
'model_ftype': {}, | |
'n_ctx_train': {}, | |
'n_embd': {}, | |
'n_layer': {}, | |
'n_head': {}, | |
'n_head_kv': {}, | |
'n_rot': {}, | |
'n_swa': {}, | |
'n_embd_head_k': {}, | |
'n_embd_head_v': {}, | |
'n_gqa': {}, | |
'n_embd_k_gqa': {}, | |
'n_embd_v_gqa': {}, | |
'f_norm_eps': {}, | |
'f_norm_rms_eps': {}, | |
'f_clamp_kqv': {}, | |
'f_max_alibi_bias': {}, | |
'f_logit_scale': {}, | |
'n_ff': {}, | |
'n_expert': {}, | |
'n_expert_used': {}, | |
'causal_attn': {}, | |
'pooling_type': {}, | |
'rope_type': {}, | |
'rope_scaling': {}, | |
'freq_base_train': {}, | |
'freq_scale_train': {}, | |
'n_ctx_orig_yarn': {}, | |
'rope_finetuned': {}, | |
'ssm_d_conv': {}, | |
'ssm_d_inner': {}, | |
'ssm_d_state': {}, | |
'ssm_dt_rank': {}, | |
'ssm_dt_b_c_rms': {}, | |
'vocab_type': {}, | |
'model_type': {} | |
} | |
model_configs = [ | |
{ | |
"repo_id": "bartowski/Llama-3.3-70B-Instruct-GGUF", | |
"filename": "Llama-3.3-70B-Instruct-Q2_K.gguf", | |
"name": "testing" | |
} | |
] | |
class ModelManager: | |
def __init__(self): | |
self.models = {} | |
def load_model(self, model_config): | |
if model_config['name'] not in self.models: | |
try: | |
self.models[model_config['name']] = Llama.from_pretrained( | |
repo_id=model_config['repo_id'], | |
filename=model_config['filename'], | |
use_auth_token=HUGGINGFACE_TOKEN, | |
n_threads=8, | |
use_gpu=False | |
) | |
except Exception as e: | |
pass | |
def load_all_models(self): | |
with ThreadPoolExecutor() as executor: | |
for config in model_configs: | |
executor.submit(self.load_model, config) | |
return self.models | |
model_manager = ModelManager() | |
global_data['models'] = model_manager.load_all_models() | |
class ChatRequest(BaseModel): | |
message: str | |
def normalize_input(input_text): | |
return input_text.strip() | |
def remove_duplicates(text): | |
lines = text.split('\n') | |
unique_lines = [] | |
seen_lines = set() | |
for line in lines: | |
if line not in seen_lines: | |
unique_lines.append(line) | |
seen_lines.add(line) | |
return '\n'.join(unique_lines) | |
def cache_response(func): | |
def wrapper(*args, **kwargs): | |
cache_key = f"{args}-{kwargs}" | |
if cache_key in cache: | |
return cache[cache_key] | |
response = func(*args, **kwargs) | |
cache[cache_key] = response | |
return response | |
return wrapper | |
def generate_model_response(model, inputs): | |
try: | |
response = model(inputs) | |
return remove_duplicates(response['choices'][0]['text']) | |
except Exception as e: | |
return "" | |
def remove_repetitive_responses(responses): | |
unique_responses = {} | |
for response in responses: | |
if response['model'] not in unique_responses: | |
unique_responses[response['model']] = response['response'] | |
return unique_responses | |
async def process_message(message): | |
inputs = normalize_input(message) | |
with ThreadPoolExecutor() as executor: | |
futures = [ | |
executor.submit(generate_model_response, model, inputs) | |
for model in global_data['models'].values() | |
] | |
responses = [ | |
{'model': model_name, 'response': future.result()} | |
for model_name, future in zip(global_data['models'].keys(), as_completed(futures)) | |
] | |
unique_responses = remove_repetitive_responses(responses) | |
formatted_response = "" | |
for model, response in unique_responses.items(): | |
formatted_response += f"**{model}:**\n{response}\n\n" | |
return formatted_response | |
app = FastAPI() | |
async def generate(request: ChatRequest): | |
try: | |
response = await process_message(request.message) | |
return JSONResponse(content={"response": response}) | |
except Exception as e: | |
return JSONResponse(content={"error": str(e)}) | |
def run_uvicorn(): | |
try: | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |
except Exception as e: | |
print(f"Error al ejecutar uvicorn: {e}") | |
iface = gr.Interface( | |
fn=process_message, | |
inputs=gr.Textbox(lines=2, placeholder="Enter your message here..."), | |
outputs=gr.Markdown(), | |
title="Multi-Model LLM API (CPU Optimized)", | |
description="" | |
) | |
def run_gradio(): | |
iface.launch(server_port=7862, prevent_thread_lock=True) | |
if __name__ == "__main__": | |
Thread(target=run_uvicorn).start() | |
Thread(target=run_gradio).start() | |
asyncio.get_event_loop().run_forever() | |