|
import os |
|
from typing import Optional, Tuple, List |
|
from shutil import copyfile |
|
|
|
import torch |
|
from transformers import PreTrainedTokenizer |
|
from transformers.utils import logging |
|
from transformers.tokenization_utils_base import BatchEncoding |
|
import sentencepiece as spm |
|
|
|
logger = logging.get_logger(__name__) |
|
VOCAB_FILES_NAMES = {"vocab_file": "cog-pretrain.model"} |
|
|
|
|
|
class GLMChineseTokenizer(PreTrainedTokenizer): |
|
vocab_files_names = VOCAB_FILES_NAMES |
|
|
|
def __init__(self, vocab_file, **kwargs): |
|
super().__init__(**kwargs) |
|
|
|
self.sp_model = spm.SentencePieceProcessor() |
|
self.sp_model.Load(vocab_file) |
|
|
|
@property |
|
def vocab_size(self): |
|
return len(self.sp_model) |
|
|
|
def get_vocab(self): |
|
vocab = {self.convert_ids_to_tokens(i): i for i in range(self.vocab_size)} |
|
vocab.update(self.added_tokens_encoder) |
|
return vocab |
|
|
|
def _tokenize(self, text, **kwargs): |
|
return self.sp_model.encode(text, out_type=str) |
|
|
|
def _convert_token_to_id(self, token): |
|
"""Converts a token (str) in an id using the vocab.""" |
|
return self.sp_model.PieceToId(token) |
|
|
|
def _convert_id_to_token(self, index): |
|
"""Converts an index (integer) in a token (str) using the vocab.""" |
|
return self.sp_model.IdToPiece(index) |
|
|
|
def convert_tokens_to_string(self, tokens): |
|
return self.sp_model.decode(tokens) |
|
|
|
def save_vocabulary(self, save_directory: str, filename_prefix: Optional[str] = None) -> Tuple[str]: |
|
if not os.path.isdir(save_directory): |
|
logger.error(f"Vocabulary path ({save_directory}) should be a directory") |
|
return |
|
out_vocab_file = os.path.join( |
|
save_directory, (filename_prefix + "-" if filename_prefix else "") + VOCAB_FILES_NAMES["vocab_file"] |
|
) |
|
|
|
if os.path.abspath(self.vocab_file) != os.path.abspath(out_vocab_file) and os.path.isfile(self.vocab_file): |
|
copyfile(self.vocab_file, out_vocab_file) |
|
elif not os.path.isfile(self.vocab_file): |
|
with open(out_vocab_file, "wb") as fi: |
|
content_spiece_model = self.sp_model.serialized_model_proto() |
|
fi.write(content_spiece_model) |
|
|
|
return (out_vocab_file,) |
|
|
|
@property |
|
def sop_token(self) -> Optional[str]: |
|
return "<|startofpiece|>" |
|
|
|
@property |
|
def sop_token_id(self) -> Optional[int]: |
|
""" |
|
`Optional[int]`: Id of the start token in the vocabulary, used when training a model with autoregressive blank filling. Returns `None` if the token has not been set. |
|
""" |
|
return self.convert_tokens_to_ids(self.sop_token) |
|
|
|
@property |
|
def eop_token(self) -> Optional[str]: |
|
return "<|endofpiece|>" |
|
|
|
@property |
|
def eop_token_id(self) -> Optional[int]: |
|
""" |
|
`Optional[int]`: Id of the end token in the vocabulary, used when training a model with autoregressive blank filling. Returns `None` if the token has not been set. |
|
""" |
|
return self.convert_tokens_to_ids(self.eop_token) |
|
|
|
@property |
|
def gmask_token_id(self) -> int: |
|
return self.convert_tokens_to_ids("[gMASK]") |
|
|
|
@property |
|
def smask_token_id(self) -> int: |
|
return self.convert_tokens_to_ids("[sMASK]") |
|
|
|
def build_inputs_for_generation(self, model_input: BatchEncoding, max_gen_length=512): |
|
mask_ids = [self.mask_token_id, self.smask_token_id, self.gmask_token_id] |
|
input_ids = model_input.input_ids |
|
batch_size, seq_length = input_ids.shape[:2] |
|
position_id, block_position_id = list(range(seq_length)), [0 for _ in range(seq_length)] |
|
position_ids, block_position_ids = [], [] |
|
for i in range(batch_size): |
|
mask_positions = [] |
|
for mask_id in mask_ids: |
|
mask_positions += (input_ids[i] == mask_id).nonzero(as_tuple=True)[0].tolist() |
|
if not mask_positions: |
|
raise ValueError("Cannot find mask token in the input") |
|
mask_positions.sort() |
|
mask_pos = mask_positions[0] |
|
position_ids.append(position_id + [mask_pos] * max_gen_length) |
|
block_position_ids.append(block_position_id + list(range(1, max_gen_length + 1))) |
|
position_ids = torch.tensor(position_ids, dtype=input_ids.dtype, device=input_ids.device) |
|
block_position_ids = torch.tensor(block_position_ids, dtype=input_ids.dtype, device=input_ids.device) |
|
position_ids = torch.stack((position_ids, block_position_ids), dim=1) |
|
attention_mask = model_input.attention_mask |
|
attention_mask = attention_mask.unsqueeze(1).expand(-1, seq_length + max_gen_length, -1) |
|
generation_attention_mask = torch.cat([attention_mask.new_zeros((seq_length, max_gen_length)), |
|
torch.tril(attention_mask.new_ones((max_gen_length, max_gen_length)))], |
|
dim=0).unsqueeze(0).expand(batch_size, -1, -1) |
|
attention_mask = torch.cat((attention_mask, generation_attention_mask), dim=2) |
|
attention_mask = attention_mask.unsqueeze(1) |
|
input_ids = torch.cat((input_ids, input_ids.new_full((batch_size, 1), self.sop_token_id)), dim=-1) |
|
return BatchEncoding( |
|
{"input_ids": input_ids, "position_ids": position_ids, "generation_attention_mask": attention_mask} |
|
) |
|
|
|
def build_inputs_with_special_tokens( |
|
self, token_ids_0: List[int], token_ids_1: Optional[List[int]] = None |
|
) -> List[int]: |
|
""" |
|
Build model inputs from a sequence or a pair of sequence for sequence classification tasks by concatenating and |
|
adding special tokens. A BERT sequence has the following format: |
|
|
|
- single sequence: ``[CLS] X [SEP]`` |
|
- pair of sequences: ``[CLS] A [SEP] B [SEP]`` |
|
|
|
Args: |
|
token_ids_0 (:obj:`List[int]`): |
|
List of IDs to which the special tokens will be added. |
|
token_ids_1 (:obj:`List[int]`, `optional`): |
|
Optional second list of IDs for sequence pairs. |
|
|
|
Returns: |
|
:obj:`List[int]`: List of `input IDs <../glossary.html#input-ids>`__ with the appropriate special tokens. |
|
""" |
|
assert token_ids_1 is None |
|
cls = [self.cls_token_id] |
|
eos = [self.eos_token_id] |
|
return cls + token_ids_0 + eos |
|
|