|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import json |
|
|
|
import numpy as np |
|
import triton_python_backend_utils as pb_utils |
|
from transformers import AutoTokenizer, LlamaTokenizer, T5Tokenizer |
|
|
|
|
|
class TritonPythonModel: |
|
"""Your Python model must use the same class name. Every Python model |
|
that is created must have "TritonPythonModel" as the class name. |
|
""" |
|
|
|
def initialize(self, args): |
|
"""`initialize` is called only once when the model is being loaded. |
|
Implementing `initialize` function is optional. This function allows |
|
the model to initialize any state associated with this model. |
|
Parameters |
|
---------- |
|
args : dict |
|
Both keys and values are strings. The dictionary keys and values are: |
|
* model_config: A JSON string containing the model configuration |
|
* model_instance_kind: A string containing model instance kind |
|
* model_instance_device_id: A string containing model instance device ID |
|
* model_repository: Model repository path |
|
* model_version: Model version |
|
* model_name: Model name |
|
""" |
|
|
|
model_config = json.loads(args['model_config']) |
|
tokenizer_dir = model_config['parameters']['tokenizer_dir'][ |
|
'string_value'] |
|
tokenizer_type = model_config['parameters']['tokenizer_type'][ |
|
'string_value'] |
|
self.skip_special_tokens = model_config['parameters'].get( |
|
'skip_special_tokens', |
|
{'string_value': "true"})['string_value'].lower() in [ |
|
'true', '1', 't', 'y', 'yes' |
|
] |
|
|
|
if tokenizer_type == 't5': |
|
self.tokenizer = T5Tokenizer(vocab_file=tokenizer_dir, |
|
padding_side='left') |
|
elif tokenizer_type == 'auto': |
|
self.tokenizer = AutoTokenizer.from_pretrained( |
|
tokenizer_dir, padding_side='left', trust_remote_code=True) |
|
elif tokenizer_type == 'llama': |
|
self.tokenizer = LlamaTokenizer.from_pretrained( |
|
tokenizer_dir, legacy=False, padding_side='left') |
|
else: |
|
raise AttributeError( |
|
f'Unexpected tokenizer type: {tokenizer_type}') |
|
self.tokenizer.pad_token = self.tokenizer.eos_token |
|
|
|
|
|
output_config = pb_utils.get_output_config_by_name( |
|
model_config, "OUTPUT") |
|
|
|
|
|
self.output_dtype = pb_utils.triton_string_to_numpy( |
|
output_config['data_type']) |
|
output_lens_config = pb_utils.get_output_config_by_name( |
|
model_config, "OUTPUT_LENS") |
|
|
|
def execute(self, requests): |
|
"""`execute` must be implemented in every Python model. `execute` |
|
function receives a list of pb_utils.InferenceRequest as the only |
|
argument. This function is called when an inference is requested |
|
for this model. Depending on the batching configuration (e.g. Dynamic |
|
Batching) used, `requests` may contain multiple requests. Every |
|
Python model, must create one pb_utils.InferenceResponse for every |
|
pb_utils.InferenceRequest in `requests`. If there is an error, you can |
|
set the error argument when creating a pb_utils.InferenceResponse. |
|
Parameters |
|
---------- |
|
requests : list |
|
A list of pb_utils.InferenceRequest |
|
Returns |
|
------- |
|
list |
|
A list of pb_utils.InferenceResponse. The length of this list must |
|
be the same as `requests` |
|
""" |
|
|
|
responses = [] |
|
|
|
|
|
|
|
for idx, request in enumerate(requests): |
|
|
|
tokens_batch = pb_utils.get_input_tensor_by_name( |
|
request, 'TOKENS_BATCH').as_numpy() |
|
|
|
|
|
sequence_lengths = pb_utils.get_input_tensor_by_name( |
|
request, 'SEQUENCE_LENGTH').as_numpy() |
|
|
|
|
|
cum_log_probs = pb_utils.get_input_tensor_by_name( |
|
request, 'CUM_LOG_PROBS').as_numpy() |
|
|
|
|
|
output_log_probs = pb_utils.get_input_tensor_by_name( |
|
request, 'OUTPUT_LOG_PROBS').as_numpy() |
|
|
|
|
|
|
|
|
|
|
|
|
|
outputs, output_lens = self._postprocessing(tokens_batch, sequence_lengths) |
|
|
|
|
|
|
|
|
|
output_tensor = pb_utils.Tensor( |
|
'OUTPUT', |
|
np.array(outputs).astype(self.output_dtype)) |
|
|
|
out_cum_log_probs = pb_utils.Tensor('OUT_CUM_LOG_PROBS', |
|
cum_log_probs) |
|
|
|
out_output_log_probs = pb_utils.Tensor('OUT_OUTPUT_LOG_PROBS', |
|
output_log_probs) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
inference_response = pb_utils.InferenceResponse(output_tensors=[ |
|
output_tensor, out_cum_log_probs, out_output_log_probs |
|
]) |
|
responses.append(inference_response) |
|
|
|
|
|
|
|
return responses |
|
|
|
def finalize(self): |
|
"""`finalize` is called only once when the model is being unloaded. |
|
Implementing `finalize` function is optional. This function allows |
|
the model to perform any necessary clean ups before exit. |
|
""" |
|
print('Cleaning up...') |
|
|
|
def _postprocessing(self, tokens_batch, sequence_lengths): |
|
outputs = [] |
|
for batch_idx, beam_tokens in enumerate(tokens_batch): |
|
for beam_idx, tokens in enumerate(beam_tokens): |
|
seq_len = sequence_lengths[batch_idx][beam_idx] |
|
output = self.tokenizer.decode( |
|
tokens[:seq_len], |
|
skip_special_tokens=self.skip_special_tokens) |
|
outputs.append(output.encode('utf8')) |
|
return outputs |
|
|