#!/usr/bin/env python | |
import os | |
from threading import Thread | |
from typing import Iterator | |
import gradio as gr | |
import spaces | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer | |
# Debugging: Start script | |
print("Starting script...") | |
HF_TOKEN = os.environ.get("HF_TOKEN") | |
if HF_TOKEN is None: | |
print("Warning: HF_TOKEN is not set!") | |
PASSWORD = os.getenv("APP_PASSWORD", "mysecretpassword") # Set your desired password here or via environment variable | |
DESCRIPTION = "# FT of Lama" | |
if not torch.cuda.is_available(): | |
DESCRIPTION += "\n<p>Running on CPU 🥶 This demo does not work on CPU.</p>" | |
print("Warning: No GPU available. This model cannot run on CPU.") | |
else: | |
print("GPU is available!") | |
MAX_MAX_NEW_TOKENS = 2048 | |
DEFAULT_MAX_NEW_TOKENS = 1024 | |
MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) | |
# Debugging: GPU check passed, loading model | |
if torch.cuda.is_available(): | |
model_id = "BGLAW/bggpt-Instruct-bglawinsv1UNS_merged" | |
try: | |
print("Loading model...") | |
model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.float16, device_map="auto", token=HF_TOKEN) | |
print("Model loaded successfully!") | |
print("Loading tokenizer...") | |
tokenizer = AutoTokenizer.from_pretrained(model_id, token=HF_TOKEN) | |
print("Tokenizer loaded successfully!") | |
except Exception as e: | |
print(f"Error loading model or tokenizer: {e}") | |
raise e # Re-raise the error after logging it | |
def generate( | |
message: str, | |
chat_history: list[tuple[str, str]], | |
max_new_tokens: int = 1024, | |
temperature: float = 0.6, | |
top_p: float = 0.9, | |
top_k: int = 50, | |
repetition_penalty: float = 1.2, | |
) -> Iterator[str]: | |
print(f"Received message: {message}") | |
print(f"Chat history: {chat_history}") | |
conversation = [] | |
for user, assistant in chat_history: | |
conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]) | |
conversation.append({"role": "user", "content": message}) | |
try: | |
print("Tokenizing input...") | |
input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt") | |
print(f"Input tokenized: {input_ids.shape}") | |
if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: | |
input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] | |
gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") | |
print("Trimmed input tokens due to length.") | |
input_ids = input_ids.to(model.device) | |
print("Input moved to the model's device.") | |
streamer = TextIteratorStreamer(tokenizer, timeout=20.0, skip_prompt=True, skip_special_tokens=True) | |
generate_kwargs = dict( | |
{"input_ids": input_ids}, | |
streamer=streamer, | |
max_new_tokens=max_new_tokens, | |
do_sample=True, | |
top_p=top_p, | |
top_k=top_k, | |
temperature=temperature, | |
num_beams=1, | |
repetition_penalty=repetition_penalty, | |
) | |
print("Starting generation...") | |
t = Thread(target=model.generate, kwargs=generate_kwargs) | |
t.start() | |
print("Thread started for model generation.") | |
outputs = [] | |
for text in streamer: | |
outputs.append(text) | |
print(f"Generated text so far: {''.join(outputs)}") | |
yield "".join(outputs) | |
except Exception as e: | |
print(f"Error during generation: {e}") | |
raise e # Re-raise the error after logging it | |
def password_auth(password): | |
if password == PASSWORD: | |
return gr.update(visible=True), gr.update(visible=False) | |
else: | |
return gr.update(visible=False), gr.update(visible=True, value="Incorrect password. Try again.") | |
chat_interface = gr.ChatInterface( | |
fn=generate, | |
additional_inputs=[ | |
gr.Slider( | |
label="Max new tokens", | |
minimum=1, | |
maximum=MAX_MAX_NEW_TOKENS, | |
step=1, | |
value=DEFAULT_MAX_NEW_TOKENS, | |
), | |
gr.Slider( | |
label="Temperature", | |
minimum=0.1, | |
maximum=4.0, | |
step=0.1, | |
value=0.6, | |
), | |
gr.Slider( | |
label="Top-p (nucleus sampling)", | |
minimum=0.05, | |
maximum=1.0, | |
step=0.05, | |
value=0.9, | |
), | |
gr.Slider( | |
label="Top-k", | |
minimum=1, | |
maximum=1000, | |
step=1, | |
value=50, | |
), | |
gr.Slider( | |
label="Repetition penalty", | |
minimum=1.0, | |
maximum=2.0, | |
step=0.05, | |
value=1.2, | |
), | |
], | |
stop_btn=None, | |
examples=[ | |
["Hello there! How are you doing?"], | |
["Can you explain briefly to me what is the Python programming language?"], | |
["Explain the plot of Cinderella in a sentence."], | |
["How many hours does it take a man to eat a Helicopter?"], | |
["Write a 100-word article on 'Benefits of Open-Source in AI research'"], | |
], | |
) | |
# Debugging: Interface setup | |
print("Setting up interface...") | |
with gr.Blocks(css="style.css") as demo: | |
gr.Markdown(DESCRIPTION) | |
# Create login components | |
with gr.Row(visible=True) as login_area: | |
password_input = gr.Textbox( | |
label="Enter Password", type="password", placeholder="Password", show_label=True | |
) | |
login_btn = gr.Button("Submit") | |
incorrect_password_msg = gr.Markdown("Incorrect password. Try again.", visible=False) | |
# Main chat interface | |
with gr.Column(visible=False) as chat_area: | |
gr.Markdown(DESCRIPTION) | |
gr.DuplicateButton( | |
value="Duplicate Space for private use", | |
elem_id="duplicate-button", | |
visible=os.getenv("SHOW_DUPLICATE_BUTTON") == "1", | |
) | |
chat_interface.render() | |
# Bind login button to check password | |
login_btn.click(password_auth, inputs=password_input, outputs=[chat_area, incorrect_password_msg]) | |
# Debugging: Starting queue and launching the demo | |
print("Launching demo...") | |
if __name__ == "__main__": | |
demo.queue(max_size=20).launch(share=True) | |
# WORKING | |
# #!/usr/bin/env python | |
# import os | |
# from threading import Thread | |
# from typing import Iterator | |
# import gradio as gr | |
# import spaces | |
# import torch | |
# from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer | |
# # Debugging: Start script | |
# print("Starting script...") | |
# HF_TOKEN = os.environ.get("HF_TOKEN") | |
# if HF_TOKEN is None: | |
# print("Warning: HF_TOKEN is not set!") | |
# DESCRIPTION = "# Mistral-7B v0.2" | |
# if not torch.cuda.is_available(): | |
# DESCRIPTION += "\n<p>Running on CPU 🥶 This demo does not work on CPU.</p>" | |
# print("Warning: No GPU available. This model cannot run on CPU.") | |
# else: | |
# print("GPU is available!") | |
# MAX_MAX_NEW_TOKENS = 2048 | |
# DEFAULT_MAX_NEW_TOKENS = 1024 | |
# MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) | |
# # Debugging: GPU check passed, loading model | |
# if torch.cuda.is_available(): | |
# model_id = "mistralai/Mistral-7B-Instruct-v0.2" | |
# try: | |
# print("Loading model...") | |
# model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.float16, device_map="auto", token=HF_TOKEN) | |
# print("Model loaded successfully!") | |
# print("Loading tokenizer...") | |
# tokenizer = AutoTokenizer.from_pretrained(model_id, token=HF_TOKEN) | |
# print("Tokenizer loaded successfully!") | |
# except Exception as e: | |
# print(f"Error loading model or tokenizer: {e}") | |
# raise e # Re-raise the error after logging it | |
# @spaces.GPU | |
# def generate( | |
# message: str, | |
# chat_history: list[tuple[str, str]], | |
# max_new_tokens: int = 1024, | |
# temperature: float = 0.6, | |
# top_p: float = 0.9, | |
# top_k: int = 50, | |
# repetition_penalty: float = 1.2, | |
# ) -> Iterator[str]: | |
# print(f"Received message: {message}") | |
# print(f"Chat history: {chat_history}") | |
# conversation = [] | |
# for user, assistant in chat_history: | |
# conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]) | |
# conversation.append({"role": "user", "content": message}) | |
# try: | |
# print("Tokenizing input...") | |
# input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt") | |
# print(f"Input tokenized: {input_ids.shape}") | |
# if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: | |
# input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] | |
# gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") | |
# print("Trimmed input tokens due to length.") | |
# input_ids = input_ids.to(model.device) | |
# print("Input moved to the model's device.") | |
# streamer = TextIteratorStreamer(tokenizer, timeout=20.0, skip_prompt=True, skip_special_tokens=True) | |
# generate_kwargs = dict( | |
# {"input_ids": input_ids}, | |
# streamer=streamer, | |
# max_new_tokens=max_new_tokens, | |
# do_sample=True, | |
# top_p=top_p, | |
# top_k=top_k, | |
# temperature=temperature, | |
# num_beams=1, | |
# repetition_penalty=repetition_penalty, | |
# ) | |
# print("Starting generation...") | |
# t = Thread(target=model.generate, kwargs=generate_kwargs) | |
# t.start() | |
# print("Thread started for model generation.") | |
# outputs = [] | |
# for text in streamer: | |
# outputs.append(text) | |
# print(f"Generated text so far: {''.join(outputs)}") | |
# yield "".join(outputs) | |
# except Exception as e: | |
# print(f"Error during generation: {e}") | |
# raise e # Re-raise the error after logging it | |
# chat_interface = gr.ChatInterface( | |
# fn=generate, | |
# additional_inputs=[ | |
# gr.Slider( | |
# label="Max new tokens", | |
# minimum=1, | |
# maximum=MAX_MAX_NEW_TOKENS, | |
# step=1, | |
# value=DEFAULT_MAX_NEW_TOKENS, | |
# ), | |
# gr.Slider( | |
# label="Temperature", | |
# minimum=0.1, | |
# maximum=4.0, | |
# step=0.1, | |
# value=0.6, | |
# ), | |
# gr.Slider( | |
# label="Top-p (nucleus sampling)", | |
# minimum=0.05, | |
# maximum=1.0, | |
# step=0.05, | |
# value=0.9, | |
# ), | |
# gr.Slider( | |
# label="Top-k", | |
# minimum=1, | |
# maximum=1000, | |
# step=1, | |
# value=50, | |
# ), | |
# gr.Slider( | |
# label="Repetition penalty", | |
# minimum=1.0, | |
# maximum=2.0, | |
# step=0.05, | |
# value=1.2, | |
# ), | |
# ], | |
# stop_btn=None, | |
# examples=[ | |
# ["Hello there! How are you doing?"], | |
# ["Can you explain briefly to me what is the Python programming language?"], | |
# ["Explain the plot of Cinderella in a sentence."], | |
# ["How many hours does it take a man to eat a Helicopter?"], | |
# ["Write a 100-word article on 'Benefits of Open-Source in AI research'"], | |
# ], | |
# ) | |
# # Debugging: Interface setup | |
# print("Setting up interface...") | |
# with gr.Blocks(css="style.css") as demo: | |
# gr.Markdown(DESCRIPTION) | |
# gr.DuplicateButton( | |
# value="Duplicate Space for private use", | |
# elem_id="duplicate-button", | |
# visible=os.getenv("SHOW_DUPLICATE_BUTTON") == "1", | |
# ) | |
# chat_interface.render() | |
# # Debugging: Starting queue and launching the demo | |
# print("Launching demo...") | |
# if __name__ == "__main__": | |
# demo.queue(max_size=20).launch(share=True) | |