Spaces:
Running
Running
# app.py (second app by claude) | |
import gradio as gr | |
import torch | |
from transformers import PreTrainedTokenizerFast | |
from pathlib import Path | |
import sys | |
# sys.path.append(str(Path('gpt_model_code.py').resolve())) | |
# from gpt_model_code import load_model_n_tokenizer, generate | |
# ==================================================================================- | |
# ==================================================================================- | |
# ==================================================================================- | |
# ==================================================================================- | |
# ==================================================================================- | |
# could not make importing from gpt_model_code.py work, so i copied the code here | |
# Copyright (c) Sebastian Raschka under Apache License 2.0 (see LICENSE.txt). | |
# Source for "Build a Large Language Model From Scratch" | |
# - https://www.manning.com/books/build-a-large-language-model-from-scratch | |
# Code: https://github.com/rasbt/LLMs-from-scratch | |
# | |
# This file collects all the relevant code that we covered thus far | |
# throughout Chapters 2-5. | |
import torch | |
import torch.nn as nn | |
from huggingface_hub import PyTorchModelHubMixin | |
from transformers import PreTrainedTokenizerFast | |
GPT_CONFIG_124M = { | |
"vocab_size": 50000, # Vocabulary size | |
"context_length": 1024, # Context length | |
"emb_dim": 768, # Embedding dimension | |
"n_heads": 12, # Number of attention heads | |
"n_layers": 12, # Number of layers | |
"drop_rate": 0.1, # Dropout rate | |
"qkv_bias": False # Query-key-value bias | |
} | |
##################################### | |
# Chapter 3 | |
##################################### | |
class MultiHeadAttention(nn.Module): | |
def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False): | |
super().__init__() | |
assert d_out % num_heads == 0, "d_out must be divisible by n_heads" | |
self.d_out = d_out | |
self.num_heads = num_heads | |
self.head_dim = d_out // num_heads # Reduce the projection dim to match desired output dim | |
self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias) | |
self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias) | |
self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias) | |
self.out_proj = nn.Linear(d_out, d_out) # Linear layer to combine head outputs | |
self.dropout = nn.Dropout(dropout) | |
self.register_buffer('mask', torch.triu(torch.ones(context_length, context_length), diagonal=1)) | |
def forward(self, x): | |
b, num_tokens, d_in = x.shape | |
keys = self.W_key(x) # Shape: (b, num_tokens, d_out) | |
queries = self.W_query(x) | |
values = self.W_value(x) | |
# We implicitly split the matrix by adding a `num_heads` dimension | |
# Unroll last dim: (b, num_tokens, d_out) -> (b, num_tokens, num_heads, head_dim) | |
keys = keys.view(b, num_tokens, self.num_heads, self.head_dim) | |
values = values.view(b, num_tokens, self.num_heads, self.head_dim) | |
queries = queries.view(b, num_tokens, self.num_heads, self.head_dim) | |
# Transpose: (b, num_tokens, num_heads, head_dim) -> (b, num_heads, num_tokens, head_dim) | |
keys = keys.transpose(1, 2) | |
queries = queries.transpose(1, 2) | |
values = values.transpose(1, 2) | |
# Compute scaled dot-product attention (aka self-attention) with a causal mask | |
attn_scores = queries @ keys.transpose(2, 3) # Dot product for each head | |
# Original mask truncated to the number of tokens and converted to boolean | |
mask_bool = self.mask.bool()[:num_tokens, :num_tokens] | |
# Use the mask to fill attention scores | |
attn_scores.masked_fill_(mask_bool, -torch.inf) | |
attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5, dim=-1) | |
attn_weights = self.dropout(attn_weights) | |
# Shape: (b, num_tokens, num_heads, head_dim) | |
context_vec = (attn_weights @ values).transpose(1, 2) | |
# Combine heads, where self.d_out = self.num_heads * self.head_dim | |
context_vec = context_vec.reshape(b, num_tokens, self.d_out) | |
context_vec = self.out_proj(context_vec) # optional projection | |
return context_vec | |
##################################### | |
# Chapter 4 | |
##################################### | |
class LayerNorm(nn.Module): | |
def __init__(self, emb_dim): | |
super().__init__() | |
self.eps = 1e-5 | |
self.scale = nn.Parameter(torch.ones(emb_dim)) | |
self.shift = nn.Parameter(torch.zeros(emb_dim)) | |
def forward(self, x): | |
mean = x.mean(dim=-1, keepdim=True) | |
var = x.var(dim=-1, keepdim=True, unbiased=False) | |
norm_x = (x - mean) / torch.sqrt(var + self.eps) | |
return self.scale * norm_x + self.shift | |
class GELU(nn.Module): | |
def __init__(self): | |
super().__init__() | |
def forward(self, x): | |
return 0.5 * x * (1 + torch.tanh( | |
torch.sqrt(torch.tensor(2.0 / torch.pi)) * | |
(x + 0.044715 * torch.pow(x, 3)) | |
)) | |
class FeedForward(nn.Module): | |
def __init__(self, cfg): | |
super().__init__() | |
self.layers = nn.Sequential( | |
nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"]), | |
GELU(), | |
nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"]), | |
) | |
def forward(self, x): | |
return self.layers(x) | |
class TransformerBlock(nn.Module): | |
def __init__(self, cfg): | |
super().__init__() | |
self.att = MultiHeadAttention( | |
d_in=cfg["emb_dim"], | |
d_out=cfg["emb_dim"], | |
context_length=cfg["context_length"], | |
num_heads=cfg["n_heads"], | |
dropout=cfg["drop_rate"], | |
qkv_bias=cfg["qkv_bias"]) | |
self.ff = FeedForward(cfg) | |
self.norm1 = LayerNorm(cfg["emb_dim"]) | |
self.norm2 = LayerNorm(cfg["emb_dim"]) | |
self.drop_shortcut = nn.Dropout(cfg["drop_rate"]) | |
def forward(self, x): | |
# Shortcut connection for attention block | |
shortcut = x | |
x = self.norm1(x) | |
x = self.att(x) # Shape [batch_size, num_tokens, emb_size] | |
x = self.drop_shortcut(x) | |
x = x + shortcut # Add the original input back | |
# Shortcut connection for feed-forward block | |
shortcut = x | |
x = self.norm2(x) | |
x = self.ff(x) | |
x = self.drop_shortcut(x) | |
x = x + shortcut # Add the original input back | |
return x | |
class GPTModel(nn.Module, | |
PyTorchModelHubMixin, # modified to push the model to the hub (https://huggingface.co/docs/hub/en/models-uploading#upload-a-pytorch-model-using-huggingfacehub) | |
repo_url="https://huggingface.co/Aananda-giri/GPT2-Nepali/", | |
pipeline_tag="text-generation", | |
): | |
def __init__(self, cfg): | |
super().__init__() | |
self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"]) | |
self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"]) | |
self.drop_emb = nn.Dropout(cfg["drop_rate"]) | |
self.trf_blocks = nn.Sequential( | |
*[TransformerBlock(cfg) for _ in range(cfg["n_layers"])]) | |
self.final_norm = LayerNorm(cfg["emb_dim"]) | |
self.out_head = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False) | |
def forward(self, in_idx): | |
batch_size, seq_len = in_idx.shape | |
tok_embeds = self.tok_emb(in_idx) | |
pos_embeds = self.pos_emb(torch.arange(seq_len, device=in_idx.device)) | |
x = tok_embeds + pos_embeds # Shape [batch_size, num_tokens, emb_size] | |
x = self.drop_emb(x) | |
x = self.trf_blocks(x) | |
x = self.final_norm(x) | |
logits = self.out_head(x) | |
return logits | |
##################################### | |
# Chapter 5 | |
##################################### | |
def text_to_token_ids(text, tokenizer): | |
encoded = tokenizer.encode(text) | |
encoded_tensor = torch.tensor(encoded).unsqueeze(0) # add batch dimension | |
return encoded_tensor | |
def token_ids_to_text(token_ids, tokenizer): | |
flat = token_ids.squeeze(0) # remove batch dimension | |
return tokenizer.decode(flat.tolist()) | |
def load_model_n_tokenizer(): | |
model = GPTModel.from_pretrained("Aananda-giri/GPT2-Nepali") | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
model.to(device) | |
tokenizer = PreTrainedTokenizerFast.from_pretrained("Aananda-giri/GPT2-Nepali") | |
return model, tokenizer | |
def generate( | |
model, | |
prompt, | |
tokenizer, | |
max_new_tokens, | |
temperature=0.7, | |
top_k=50, | |
top_p=None, # New parameter for nucleus sampling | |
eos_id=None, | |
repetition_penalty=1.2, | |
penalize_len_below=50 | |
): | |
context_size = GPT_CONFIG_124M['context_length'] | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
idx = text_to_token_ids(prompt, tokenizer).to(device) | |
if not eos_id: | |
encoded_endoftext = tokenizer.encode("<|endoftext|>") | |
eos_id = encoded_endoftext[0] if encoded_endoftext else None | |
token_freq = {} | |
for step in range(max_new_tokens): | |
idx_cond = idx[:, -context_size:] | |
with torch.no_grad(): | |
logits = model(idx_cond) | |
logits = logits[:, -1, :] | |
# Apply repetition penalty | |
for token_id in idx[0].tolist(): | |
if token_id in token_freq: | |
logits[0, token_id] /= repetition_penalty | |
else: | |
token_freq[token_id] = 1 | |
# Penalize EOT token for shorter sequences | |
if eos_id is not None and step < penalize_len_below: | |
logits[0, eos_id] /= (penalize_len_below - step) / penalize_len_below | |
# Apply temperature scaling | |
if temperature > 0.0: | |
logits = logits / temperature | |
# Convert logits to probabilities | |
probs = torch.softmax(logits, dim=-1) | |
# Apply top-p (nucleus) sampling if specified | |
if top_p: | |
sorted_probs, sorted_indices = torch.sort(probs, descending=True) | |
cumulative_probs = torch.cumsum(sorted_probs, dim=-1) | |
# Remove tokens with cumulative probability above the threshold | |
sorted_indices_to_remove = cumulative_probs > top_p | |
# Shift the indices to the right to keep also the first token above the threshold | |
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() | |
sorted_indices_to_remove[..., 0] = 0 | |
# Create a mask for indices to remove | |
indices_to_remove = sorted_indices_to_remove.scatter(dim=-1, index=sorted_indices, src=sorted_indices_to_remove) | |
probs = probs.masked_fill(indices_to_remove, 0.0) | |
# Renormalize probabilities | |
probs = probs / probs.sum(dim=-1, keepdim=True) | |
# If top_p is None, apply top-k sampling | |
elif top_k: | |
top_probs, top_indices = torch.topk(probs, top_k) | |
probs = torch.zeros_like(probs).scatter_(-1, top_indices, top_probs) | |
# Renormalize probabilities | |
probs = probs / probs.sum(dim=-1, keepdim=True) | |
# Sample from the filtered distribution | |
if temperature > 0.0: | |
idx_next = torch.multinomial(probs, num_samples=1) | |
else: | |
idx_next = torch.argmax(probs, dim=-1, keepdim=True) | |
if idx_next == eos_id: | |
break | |
idx = torch.cat((idx, idx_next), dim=1) | |
text = token_ids_to_text(idx, tokenizer) | |
return text | |
# ==================================================================================- | |
# ==================================================================================- | |
# ==================================================================================- | |
# ==================================================================================- | |
# ==================================================================================- | |
# Load model and tokenizer once at startup | |
model, tokenizer = load_model_n_tokenizer() | |
model.eval() | |
import sys | |
def generate_text(prompt, max_new_tokens, top_k, top_p, temperature, repetition_penalty, penalize_len_below): | |
device = next(model.parameters()).device | |
# Convert top_k to None if using top_p | |
if top_p > 0: | |
top_k = None | |
else: | |
top_p = None | |
with torch.no_grad(): | |
if top_k!=None: | |
# it expects top_k to be integer not float | |
top_k = int(top_k) | |
output_text = generate( # function uses `with torch.no_grad()` internally already | |
model=model, | |
prompt=prompt, | |
tokenizer=tokenizer, | |
max_new_tokens=max_new_tokens, | |
top_p=top_p,# top p sampling is prefered over top k if top_p != None | |
top_k=top_k, | |
temperature=0.7, | |
repetition_penalty=repetition_penalty, # New parameter: Repetition penalty factor | |
penalize_len_below=penalize_len_below # New parameter: Minimum content length for penalizing EOT token. | |
) | |
return output_text | |
css = """ | |
#bright-textbox { | |
background-color: #ffeb3b; /* Bright yellow */ | |
color: #000000; /* Black text for contrast */ | |
border: 2px solid #fbc02d; /* Slightly darker yellow for the border */ | |
font-size: 16px; | |
padding: 10px; | |
border-radius: 5px; | |
} | |
""" | |
# Create Gradio interface | |
with gr.Blocks(title="Nepali GPT-2 Text Generator", css=css) as interface: | |
gr.Markdown("# Nepali GPT-2 Text Generator") | |
gr.Markdown("Enter Nepali (नेपाली) text to generate content using the custom GPT2-Nepali model.") | |
with gr.Row(): | |
with gr.Column(): | |
prompt = gr.Textbox( | |
label="Prompt", | |
placeholder="यहाँ नेपाली मा इन्पुट दिनु होस् ... (please Enter Nepali text here...)" #, | |
# value="रामले भात" | |
) | |
max_tokens = gr.Slider(minimum=1, maximum=512, value=50, step=1, label="Max New Tokens") | |
with gr.Row(): | |
with gr.Column(): | |
temperature = gr.Slider(minimum=0.1, maximum=2.0, value=0.7, step=0.1, label="Temperature") | |
repetition_penalty = gr.Slider(minimum=1.0, maximum=2.0, value=1.2, step=0.1, label="Repetition Penalty") | |
with gr.Column(): | |
top_k = gr.Slider(minimum=0, maximum=100, value=50, step=1, label="Top K (set to 0 to use Top P)") | |
top_p = gr.Slider(minimum=0, maximum=1.0, value=0.9, step=0.05, label="Top P (set above 0 to use instead of Top K)") | |
min_length = gr.Slider(minimum=1, maximum=200, value=50, step=1, label="Minimum Length Penalty") | |
generate_btn = gr.Button("Generate Text") | |
with gr.Column(): | |
output = gr.Textbox(label="Generated Text", lines=10) | |
# Add examples if you have any | |
gr.Examples( | |
examples=[ | |
["रामले भात", 50, 50, 0, 0.7, 1.2, 50], | |
["नेपाल एउटा", 100, 0, 0.9, 0.8, 1.2, 100], | |
["नेपाल का वर्तमान प्रधानमन्त्री ", 100, 0, 0.9, 0.8, 1.2, 100], | |
["भारतीय प्रधानमन्त्री ", 100, 0, 0.9, 0.8, 1.2, 100], | |
["अमिरिकी रास्ट्रपति डोनाल्ड", 100, 0, 0.9, 0.8, 1.2, 100], | |
], | |
inputs=[prompt, max_tokens, top_k, top_p, temperature, repetition_penalty, min_length], | |
outputs=output, | |
fn=generate_text, | |
cache_examples=True, | |
) | |
generate_btn.click( | |
fn=generate_text, | |
inputs=[prompt, max_tokens, top_p, top_k, temperature, repetition_penalty, min_length], | |
outputs=output | |
) | |
''' | |
''' | |
interface.launch() |