Spaces:
Runtime error
Runtime error
import json | |
import random | |
import string | |
import time | |
import os | |
import torch | |
import numpy as np | |
import tritonclient.grpc as client_util | |
from tokenizers import Tokenizer | |
from tritonclient.utils import np_to_triton_dtype, InferenceServerException | |
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline | |
np.finfo(np.dtype("float32")) | |
np.finfo(np.dtype("float64")) | |
token = os.environ.get("HUB_TOKEN", None) | |
device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
tokenizer = AutoTokenizer.from_pretrained("bigcode/christmas-models", use_auth_token=token) | |
model = AutoModelForCausalLM.from_pretrained("bigcode/christmas-models", trust_remote_code=True, use_auth_token=token).to(device) | |
pipe = pipeline("text-generation", model=model, tokenizer=tokenizer, device=device) | |
class CodeGenProxy: | |
def __init__(self, host: str = 'triton', port: int = 8001, verbose: bool = False): | |
self.tokenizer = AutoTokenizer.from_pretrained("bigcode/christmas-models", use_auth_token=token) | |
self.client = client_util.InferenceServerClient(url=f'{host}:{port}', verbose=verbose) | |
self.PAD_CHAR = 50256 | |
# Max number of tokens the model can handle | |
self.MAX_MODEL_LEN = 2048 | |
class TokensExceedsMaximum(Exception): | |
pass | |
def prepare_tensor(name: str, tensor_input): | |
t = client_util.InferInput( | |
name, tensor_input.shape, np_to_triton_dtype(tensor_input.dtype)) | |
t.set_data_from_numpy(tensor_input) | |
return t | |
def trim_with_stopwords(output: str, stopwords: list) -> str: | |
for w in sorted(stopwords, key=len, reverse=True): | |
if output.endswith(w): | |
output = output[:-len(w)] | |
break | |
return output | |
def to_word_list_format(word_dict, tokenizer): | |
flat_ids = [] | |
offsets = [] | |
for word_dict_item in word_dict: | |
item_flat_ids = [] | |
item_offsets = [] | |
for word in word_dict_item: | |
ids = tokenizer.encode(word) | |
if len(ids) == 0: | |
continue | |
item_flat_ids += ids | |
item_offsets.append(len(ids)) | |
# Hack, can we do this better? | |
if word == '\n\n': | |
item_flat_ids += [198, 198] | |
item_offsets.append(2) | |
flat_ids.append(np.array(item_flat_ids)) | |
offsets.append(np.cumsum(np.array(item_offsets))) | |
pad_to = max(1, max(len(ids) for ids in flat_ids)) | |
for i, (ids, offs) in enumerate(zip(flat_ids, offsets)): | |
flat_ids[i] = np.pad(ids, (0, pad_to - len(ids)), constant_values=0) | |
offsets[i] = np.pad(offs, (0, pad_to - len(offs)), constant_values=-1) | |
return np.array([flat_ids, offsets], dtype="int32").transpose((1, 0, 2)) | |
def generate(self, data): | |
global pipe | |
prompt = data['prompt'] | |
n = data.get('n', 1) | |
model_name = data["model"] | |
choices = [] | |
text = pipe(prompt, do_sample=True, top_p=0.95, max_new_tokens=50)[0]['generated_text'] | |
choice = { | |
'text': text, | |
'index': 0, | |
'finish_reason': "stop", | |
'logprobs': None, | |
} | |
choices.append(choice) | |
completion = { | |
'id': None, # fill in | |
'model': 'codegen', | |
'object': 'text_completion', | |
'created': int(time.time()), | |
'choices': None, # fill in | |
'usage': { | |
'completion_tokens': int(50), | |
'prompt_tokens': int(50), | |
'total_tokens': int(100), | |
} | |
} | |
return completion, choices | |
def random_completion_id(): | |
return 'cmpl-' + ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(29)) | |
def streamed_response(self, completion, choices): | |
for c in choices: | |
completion['id'] = self.random_completion_id() | |
completion['choices'] = [c] | |
yield f'data: {json.dumps(completion)}\n\n' | |
yield 'data: [DONE]\n\n' | |
def non_streamed_response(self, completion, choices) -> str: | |
completion['id'] = self.random_completion_id() | |
completion['choices'] = choices | |
return json.dumps(completion) | |
def __call__(self, data: dict): | |
st = time.time() | |
try: | |
completion, choices = self.generate(data) | |
except InferenceServerException as E: | |
print(E) | |
completion = {} | |
choices = [] | |
ed = time.time() | |
print(f"Returned completion in {(ed - st) * 1000} ms") | |
if data.get('stream', False): | |
return self.streamed_response(completion, choices) | |
else: | |
return self.non_streamed_response(completion, choices) | |