from pydantic import BaseModel from llama_cpp import Llama from concurrent.futures import ThreadPoolExecutor, as_completed import re import httpx import asyncio import gradio as gr import os from spaces import GPU from dotenv import load_dotenv import torch from diffusers import DiffusionPipeline load_dotenv() HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN") global_data = { 'models': {}, 'tokens': { 'eos': 'eos_token', 'pad': 'pad_token', 'padding': 'padding_token', 'unk': 'unk_token', 'bos': 'bos_token', 'sep': 'sep_token', 'cls': 'cls_token', 'mask': 'mask_token' } } model_configs = [ {"repo_id": "Ffftdtd5dtft/gpt2-xl-Q2_K-GGUF", "filename": "gpt2-xl-q2_k.gguf", "name": "GPT-2 XL"}, {"repo_id": "Ffftdtd5dtft/Meta-Llama-3.1-8B-Instruct-Q2_K-GGUF", "filename": "meta-llama-3.1-8b-instruct-q2_k.gguf", "name": "Meta Llama 3.1-8B Instruct"}, {"repo_id": "Ffftdtd5dtft/gemma-2-9b-it-Q2_K-GGUF", "filename": "gemma-2-9b-it-q2_k.gguf", "name": "Gemma 2-9B IT"}, {"repo_id": "Ffftdtd5dtft/gemma-2-27b-Q2_K-GGUF", "filename": "gemma-2-27b-q2_k.gguf", "name": "Gemma 2-27B"}, {"repo_id": "Ffftdtd5dtft/Phi-3-mini-128k-instruct-Q2_K-GGUF", "filename": "phi-3-mini-128k-instruct-q2_k.gguf", "name": "Phi-3 Mini 128K Instruct"}, {"repo_id": "Ffftdtd5dtft/Meta-Llama-3.1-8B-Q2_K-GGUF", "filename": "meta-llama-3.1-8b-q2_k.gguf", "name": "Meta Llama 3.1-8B"}, {"repo_id": "Ffftdtd5dtft/Qwen2-7B-Instruct-Q2_K-GGUF", "filename": "qwen2-7b-instruct-q2_k.gguf", "name": "Qwen2 7B Instruct"}, {"repo_id": "Ffftdtd5dtft/starcoder2-3b-Q2_K-GGUF", "filename": "starcoder2-3b-q2_k.gguf", "name": "Starcoder2 3B"}, {"repo_id": "Ffftdtd5dtft/Qwen2-1.5B-Instruct-Q2_K-GGUF", "filename": "qwen2-1.5b-instruct-q2_k.gguf", "name": "Qwen2 1.5B Instruct"}, {"repo_id": "Ffftdtd5dtft/Mistral-Nemo-Instruct-2407-Q2_K-GGUF", "filename": "mistral-nemo-instruct-2407-q2_k.gguf", "name": "Mistral Nemo Instruct 2407"}, {"repo_id": "Ffftdtd5dtft/Hermes-3-Llama-3.1-8B-IQ1_S-GGUF", "filename": "hermes-3-llama-3.1-8b-iq1_s-imat.gguf", "name": "Hermes 3 Llama 3.1-8B"}, {"repo_id": "Ffftdtd5dtft/Phi-3.5-mini-instruct-Q2_K-GGUF", "filename": "phi-3.5-mini-instruct-q2_k.gguf", "name": "Phi 3.5 Mini Instruct"}, {"repo_id": "Ffftdtd5dtft/codegemma-2b-IQ1_S-GGUF", "filename": "codegemma-2b-iq1_s-imat.gguf", "name": "Codegemma 2B"}, {"repo_id": "Ffftdtd5dtft/Phi-3-mini-128k-instruct-IQ2_XXS-GGUF", "filename": "phi-3-mini-128k-instruct-iq2_xxs-imat.gguf", "name": "Phi 3 Mini 128K Instruct XXS"}, {"repo_id": "Ffftdtd5dtft/TinyLlama-1.1B-Chat-v1.0-IQ1_S-GGUF", "filename": "tinyllama-1.1b-chat-v1.0-iq1_s-imat.gguf", "name": "TinyLlama 1.1B Chat"}, {"repo_id": "Ffftdtd5dtft/Mistral-NeMo-Minitron-8B-Base-IQ1_S-GGUF", "filename": "mistral-nemo-minitron-8b-base-iq1_s-imat.gguf", "name": "Mistral NeMo Minitron 8B Base"}, {"repo_id": "Ffftdtd5dtft/Mistral-Nemo-Instruct-2407-Q2_K-GGUF", "filename": "mistral-nemo-instruct-2407-q2_k.gguf", "name": "Mistral Nemo Instruct 2407"} ] class ModelManager: def __init__(self): self.models = {} def load_model(self, model_config): if model_config['name'] not in self.models: try: self.models[model_config['name']] = Llama.from_pretrained(repo_id=model_config['repo_id'], filename=model_config['filename'], use_auth_token=HUGGINGFACE_TOKEN) except Exception as e: print(f"Error loading model {model_config['name']}: {e}") def load_all_models(self): with ThreadPoolExecutor() as executor: for config in model_configs: executor.submit(self.load_model, config) return self.models model_manager = ModelManager() global_data['models'] = model_manager.load_all_models() class ChatRequest(BaseModel): message: str def normalize_input(input_text): return input_text.strip() def remove_duplicates(text): text = re.sub(r'(Hello there, how are you\? \[/INST\]){2,}', 'Hello there, how are you? [/INST]', text) text = re.sub(r'(How are you\? \[/INST\]){2,}', 'How are you? [/INST]', text) text = text.replace('[/INST]', '') lines = text.split('\n') unique_lines = [] seen_lines = set() for line in lines: if line not in seen_lines: unique_lines.append(line) seen_lines.add(line) return '\n'.join(unique_lines) dtype = torch.bfloat16 if torch.cuda.is_available() else torch.float32 device = "cuda" if torch.cuda.is_available() else "cpu" pipe = DiffusionPipeline.from_pretrained("black-forest-labs/FLUX.1-schnell", torch_dtype=dtype).to(device) MAX_SEED = np.iinfo(np.int32).max MAX_IMAGE_SIZE = 2048 @spaces.GPU() def generate_model_response(model, inputs): try: response = model(inputs) return remove_duplicates(response['choices'][0]['text']) except Exception as e: print(f"Error generating model response: {e}") return "" @spaces.GPU() def infer(prompt, seed=42, randomize_seed=False, width=1024, height=1024, num_inference_steps=4): if randomize_seed: seed = random.randint(0, MAX_SEED) generator = torch.Generator(device=device).manual_seed(seed) image = pipe( prompt=prompt, width=width, height=height, num_inference_steps=num_inference_steps, generator=generator, guidance_scale=0.0 ).images[0] return image, seed def remove_repetitive_responses(responses): unique_responses = {} for response in responses: if response['model'] not in unique_responses: unique_responses[response['model']] = response['response'] return unique_responses async def process_message(message): inputs = normalize_input(message) with ThreadPoolExecutor() as executor: futures = [ executor.submit(generate_model_response, model, inputs) for model in global_data['models'].values() ] responses = [{'model': model_name, 'response': future.result()} for model_name, future in zip(global_data['models'].keys(), as_completed(futures))] unique_responses = remove_repetitive_responses(responses) formatted_response = "" for model, response in unique_responses.items(): formatted_response += f"**{model}:**\n{response}\n\n" curl_command = f""" curl -X POST -H "Content-Type: application/json" \\ -d '{{"message": "{message}"}}' \\ http://localhost:7860/generate """ return formatted_response, curl_command examples = [ "a tiny astronaut hatching from an egg on the moon", "a cat holding a sign that says hello world", "an anime illustration of a wiener schnitzel", ] css=""" #col-container { margin: 0 auto; max-width: 520px; } """ with gr.Blocks(css=css) as demo: with gr.Column(elem_id="col-container"): gr.Markdown(f"""# FLUX.1 [schnell] 12B param rectified flow transformer distilled from [FLUX.1 [pro]](https://blackforestlabs.ai/) for 4 step generation [[blog](https://blackforestlabs.ai/announcing-black-forest-labs/)] [[model](https://huggingface.co/black-forest-labs/FLUX.1-schnell)] """) with gr.Row(): prompt = gr.Text( label="Prompt", show_label=False, max_lines=1, placeholder="Enter your prompt", container=False, ) run_button = gr.Button("Run", scale=0) result = gr.Image(label="Result", show_label=False) with gr.Accordion("Advanced Settings", open=False): seed = gr.Slider( label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=0, ) randomize_seed = gr.Checkbox(label="Randomize seed", value=True) with gr.Row(): width = gr.Slider( label="Width", minimum=256, maximum=MAX_IMAGE_SIZE, step=32, value=1024, ) height = gr.Slider( label="Height", minimum=256, maximum=MAX_IMAGE_SIZE, step=32, value=1024, ) with gr.Row(): num_inference_steps = gr.Slider( label="Number of inference steps", minimum=1, maximum=50, step=1, value=4, ) gr.Examples( examples=examples, fn=infer, inputs=[prompt], outputs=[result, seed], cache_examples="lazy" ) gr.on( triggers=[run_button.click, prompt.submit], fn=infer, inputs=[prompt, seed, randomize_seed, width, height, num_inference_steps], outputs=[result, seed] ) if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) demo.launch(server_port=port)