import json import random import string import time import os import torch import numpy as np import tritonclient.grpc as client_util from tokenizers import Tokenizer from tritonclient.utils import np_to_triton_dtype, InferenceServerException from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline np.finfo(np.dtype("float32")) np.finfo(np.dtype("float64")) token = os.environ.get("HUB_TOKEN", None) device = "cuda:0" if torch.cuda.is_available() else "cpu" tokenizer = AutoTokenizer.from_pretrained("bigcode/christmas-models", use_auth_token=token) model = AutoModelForCausalLM.from_pretrained("bigcode/christmas-models", trust_remote_code=True, use_auth_token=token).to(device) pipe = pipeline("text-generation", model=model, tokenizer=tokenizer, device=device) class CodeGenProxy: def __init__(self, host: str = 'triton', port: int = 8001, verbose: bool = False): self.tokenizer = AutoTokenizer.from_pretrained("bigcode/christmas-models", use_auth_token=token) self.client = client_util.InferenceServerClient(url=f'{host}:{port}', verbose=verbose) self.PAD_CHAR = 50256 # Max number of tokens the model can handle self.MAX_MODEL_LEN = 2048 class TokensExceedsMaximum(Exception): pass @staticmethod def prepare_tensor(name: str, tensor_input): t = client_util.InferInput( name, tensor_input.shape, np_to_triton_dtype(tensor_input.dtype)) t.set_data_from_numpy(tensor_input) return t @staticmethod def trim_with_stopwords(output: str, stopwords: list) -> str: for w in sorted(stopwords, key=len, reverse=True): if output.endswith(w): output = output[:-len(w)] break return output @staticmethod def to_word_list_format(word_dict, tokenizer): flat_ids = [] offsets = [] for word_dict_item in word_dict: item_flat_ids = [] item_offsets = [] for word in word_dict_item: ids = tokenizer.encode(word) if len(ids) == 0: continue item_flat_ids += ids item_offsets.append(len(ids)) # Hack, can we do this better? if word == '\n\n': item_flat_ids += [198, 198] item_offsets.append(2) flat_ids.append(np.array(item_flat_ids)) offsets.append(np.cumsum(np.array(item_offsets))) pad_to = max(1, max(len(ids) for ids in flat_ids)) for i, (ids, offs) in enumerate(zip(flat_ids, offsets)): flat_ids[i] = np.pad(ids, (0, pad_to - len(ids)), constant_values=0) offsets[i] = np.pad(offs, (0, pad_to - len(offs)), constant_values=-1) return np.array([flat_ids, offsets], dtype="int32").transpose((1, 0, 2)) def generate(self, data): global pipe prompt = data['prompt'] n = data.get('n', 1) model_name = data["model"] choices = [] text = pipe(prompt, do_sample=True, top_p=0.95, max_new_tokens=50)[0]['generated_text'] choice = { 'text': text, 'index': 0, 'finish_reason': "stop", 'logprobs': None, } choices.append(choice) completion = { 'id': None, # fill in 'model': 'codegen', 'object': 'text_completion', 'created': int(time.time()), 'choices': None, # fill in 'usage': { 'completion_tokens': int(50), 'prompt_tokens': int(50), 'total_tokens': int(100), } } return completion, choices @staticmethod def random_completion_id(): return 'cmpl-' + ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(29)) def streamed_response(self, completion, choices): for c in choices: completion['id'] = self.random_completion_id() completion['choices'] = [c] yield f'data: {json.dumps(completion)}\n\n' yield 'data: [DONE]\n\n' def non_streamed_response(self, completion, choices) -> str: completion['id'] = self.random_completion_id() completion['choices'] = choices return json.dumps(completion) def __call__(self, data: dict): st = time.time() try: completion, choices = self.generate(data) except InferenceServerException as E: print(E) completion = {} choices = [] ed = time.time() print(f"Returned completion in {(ed - st) * 1000} ms") if data.get('stream', False): return self.streamed_response(completion, choices) else: return self.non_streamed_response(completion, choices)