|
|
|
|
|
"""This is a class called HFDecoderModel which is a wrapper around transformers model and |
|
tokenizer classes. It has several methods such as __init__, tokenize, and train that are |
|
used for training and fine-tuning the model. The __init__ method takes in several arguments |
|
such as model_args, tune_strategy, and ds_config, which are used to load the pretrained |
|
model and tokenizer, and initialize the training settings. |
|
|
|
The tokenize method is used to tokenize the input text and return the input IDs and attention |
|
masks that can be fed to the model for training or inference. |
|
|
|
This class supports different tune_strategy options such as 'normal', 'none', 'lora', and |
|
'adapter', which allow for different fine-tuning settings of the model. However, the 'lora' |
|
and 'adapter' strategies are not yet implemented. |
|
|
|
Overall, this class provides a convenient interface for loading and fine-tuning transformer |
|
models and can be used for various NLP tasks such as language modeling, text classification, |
|
and question answering. |
|
""" |
|
|
|
import logging |
|
from typing import List, Union |
|
|
|
import deepspeed |
|
|
|
from peft import ( |
|
LoraConfig, |
|
PeftModel, |
|
TaskType, |
|
get_peft_config, |
|
get_peft_model, |
|
) |
|
|
|
import torch |
|
import transformers |
|
from transformers.deepspeed import HfDeepSpeedConfig |
|
|
|
from transformers.testing_utils import CaptureLogger |
|
|
|
from transformers import ( |
|
CONFIG_MAPPING, |
|
AutoConfig, |
|
AutoTokenizer, |
|
AutoModelForCausalLM, |
|
) |
|
|
|
from lmflow.datasets.dataset import Dataset |
|
from lmflow.models.decoder_model import DecoderModel |
|
from lmflow.models.interfaces.tunable import Tunable |
|
from lmflow.utils.constants import ( |
|
TEXT_ONLY_DATASET_DESCRIPTION, |
|
TEXT2TEXT_DATASET_DESCRIPTION, |
|
) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class HFDecoderModel(DecoderModel, Tunable): |
|
r""" |
|
Initializes a HFDecoderModel instance. |
|
|
|
Parameters |
|
------------ |
|
|
|
model_args : |
|
Model arguments such as model name, path, revision, etc. |
|
|
|
tune_strategy : str or none, default="normal". |
|
A string representing the dataset backend. Defaults to "huggingface". |
|
|
|
ds_config : |
|
Deepspeed configuations. |
|
|
|
args : Optional. |
|
Positional arguments. |
|
|
|
kwargs : Optional. |
|
Keyword arguments. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
model_args, |
|
tune_strategy='normal', |
|
ds_config=None, |
|
device="gpu", |
|
*args, |
|
**kwargs |
|
): |
|
""" |
|
Initializes a HFDecoderModel instance. |
|
:param model_args: dictionary with model arguments such as model name, path, revision, etc. |
|
:param tune_strategy: tuning strategy: normal, none, lora or adapter |
|
:param ds_config: deepspeed configuration for distributed training |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.device = device |
|
self.model_args = model_args |
|
torch_dtype = ( |
|
model_args.torch_dtype |
|
if model_args.torch_dtype in ["auto", None] |
|
else getattr(torch, model_args.torch_dtype) |
|
) |
|
if tune_strategy == 'normal': |
|
config_kwargs = { |
|
"cache_dir": model_args.cache_dir, |
|
"revision": model_args.model_revision, |
|
"use_auth_token": True if model_args.use_auth_token else None, |
|
} |
|
if model_args.config_name: |
|
config = AutoConfig.from_pretrained(model_args.config_name, **config_kwargs) |
|
elif model_args.model_name_or_path: |
|
config = AutoConfig.from_pretrained(model_args.model_name_or_path, **config_kwargs) |
|
else: |
|
config = CONFIG_MAPPING[model_args.model_type]() |
|
logger.warning("You are instantiating a new config instance from scratch.") |
|
if model_args.config_overrides is not None: |
|
logger.info(f"Overriding config: {model_args.config_overrides}") |
|
config.update_from_string(model_args.config_overrides) |
|
logger.info(f"New config: {config}") |
|
|
|
tokenizer_kwargs = { |
|
"cache_dir": model_args.cache_dir, |
|
"use_fast": model_args.use_fast_tokenizer, |
|
"revision": model_args.model_revision, |
|
"use_auth_token": True if model_args.use_auth_token else None, |
|
} |
|
if model_args.tokenizer_name: |
|
tokenizer = AutoTokenizer.from_pretrained(model_args.tokenizer_name, **tokenizer_kwargs) |
|
elif model_args.model_name_or_path: |
|
tokenizer = AutoTokenizer.from_pretrained(model_args.model_name_or_path, **tokenizer_kwargs) |
|
else: |
|
raise ValueError( |
|
"You are instantiating a new tokenizer from scratch. This is" |
|
" not supported by this script. You can do it from another" |
|
" script, save it, and load it from here, using" |
|
" --tokenizer_name." |
|
) |
|
|
|
if model_args.model_name_or_path: |
|
model = AutoModelForCausalLM.from_pretrained( |
|
model_args.model_name_or_path, |
|
from_tf=bool(".ckpt" in model_args.model_name_or_path), |
|
config=config, |
|
cache_dir=model_args.cache_dir, |
|
revision=model_args.model_revision, |
|
use_auth_token=True if model_args.use_auth_token else None, |
|
torch_dtype=torch_dtype, |
|
) |
|
else: |
|
model = AutoModelForCausalLM.from_config(config) |
|
n_params = sum(dict((p.data_ptr(), p.numel()) for p in model.parameters()).values()) |
|
logger.info(f"Training new model from scratch - Total size={n_params/2**20:.2f}M params") |
|
self.backend_model_full = model |
|
if model_args.use_lora: |
|
if model_args.lora_target_modules: |
|
lora_target_modules = model_args.lora_target_modules |
|
else: |
|
lora_target_modules = None |
|
peft_config = LoraConfig( |
|
task_type=TaskType.CAUSAL_LM, |
|
inference_mode=False, |
|
r=model_args.lora_r, |
|
lora_alpha=model_args.lora_alpha, |
|
lora_dropout=model_args.lora_dropout, |
|
target_modules=lora_target_modules, |
|
) |
|
model = get_peft_model(model, peft_config) |
|
model.print_trainable_parameters() |
|
|
|
|
|
|
|
|
|
embedding_size = model.get_input_embeddings().weight.shape[0] |
|
if len(tokenizer) > embedding_size: |
|
model.resize_token_embeddings(len(tokenizer)) |
|
|
|
self.config = config |
|
self.backend_model = model |
|
self.tokenizer = tokenizer |
|
self.tune_strategy = tune_strategy |
|
|
|
elif tune_strategy == 'none': |
|
|
|
peft_model_id = model_args.lora_model_path |
|
|
|
if "llama" in model_args.model_name_or_path and model_args.use_ram_optimized_load: |
|
logger.warning( |
|
"llama does not support RAM optimized load. Automatically" |
|
" use original load instead." |
|
) |
|
model_args.use_ram_optimized_load = False |
|
|
|
if model_args.use_ram_optimized_load and peft_model_id is None: |
|
try: |
|
|
|
self.backend_model = AutoModelForCausalLM.from_pretrained( |
|
model_args.model_name_or_path, |
|
device_map="auto", |
|
offload_folder="offload", |
|
offload_state_dict=True, |
|
torch_dtype=torch_dtype, |
|
) |
|
except: |
|
logger.warning( |
|
"Failed to use RAM optimized load. Automatically" |
|
" use original load instead." |
|
) |
|
|
|
self.backend_model = AutoModelForCausalLM.from_pretrained( |
|
model_args.model_name_or_path, |
|
torch_dtype=torch_dtype, |
|
) |
|
else: |
|
if peft_model_id is not None: |
|
logger.warning( |
|
"LoRA does not support RAM optimized load currently." |
|
" Automatically use original load instead." |
|
) |
|
self.backend_model = AutoModelForCausalLM.from_pretrained( |
|
model_args.model_name_or_path, |
|
torch_dtype=torch_dtype, |
|
) |
|
|
|
self.tokenizer = AutoTokenizer.from_pretrained(model_args.model_name_or_path) |
|
self.backend_model_full = self.backend_model |
|
if peft_model_id is not None: |
|
self.backend_model = PeftModel.from_pretrained( |
|
self.backend_model, peft_model_id |
|
) |
|
|
|
if device == "gpu": |
|
deepspeed.init_distributed() |
|
self.ds_engine = deepspeed.initialize(model=self.backend_model, config_params=ds_config)[0] |
|
self.ds_engine.module.eval() |
|
|
|
elif tune_strategy == 'adapter': |
|
raise NotImplementedError('adapter tune strategy not implemented') |
|
|
|
|
|
def tokenize(self, dataset, add_special_tokens=True, *args, **kwargs): |
|
""" |
|
Tokenize the full dataset. |
|
|
|
Parameters |
|
------------ |
|
dataset : lmflow.datasets.Dataset. |
|
|
|
args : Optional. |
|
Positional arguments. |
|
|
|
kwargs : Optional. |
|
Keyword arguments. |
|
|
|
Returns |
|
------------ |
|
tokenized_datasets : |
|
The tokenized dataset, without any leading or trailing special |
|
tokens (normally they are Begin-Of-Sentence or End-Of-Sentence |
|
tokens). |
|
""" |
|
|
|
|
|
if dataset.get_backend() != "huggingface": |
|
raise NotImplementedError( |
|
"tokenization of datasets with non-huggingface backend are" |
|
"not supported yet" |
|
) |
|
|
|
dataset_type = dataset.get_type() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tokenized_column_order = None |
|
label_columns = None |
|
if dataset_type == "text_only": |
|
tokenized_column_order = ["text"] |
|
label_columns = ["text"] |
|
elif dataset_type == "text2text": |
|
tokenized_column_order = ["input", "output"] |
|
label_columns = ["output"] |
|
else: |
|
raise NotImplementedError( |
|
f"dataset type \"{dataset_type}\" is not supported, currently" |
|
" only support following data types:\n" |
|
f" 1) {TEXT_ONLY_DATASET_DESCRIPTION}\n" |
|
f" 2) {TEXT2TEXT_DATASET_DESCRIPTION}\n" |
|
) |
|
|
|
model_args = self.model_args |
|
raw_datasets = dataset |
|
hf_raw_datasets = dataset.get_backend_dataset() |
|
column_names = list(hf_raw_datasets.features) |
|
|
|
|
|
|
|
tok_logger = transformers.utils.logging.get_logger("transformers.tokenization_utils_base") |
|
|
|
def tokenize_function(examples): |
|
num_example = len(examples[column_names[0]]) |
|
token_dict = { |
|
"input_ids": [[] for _ in range(num_example)], |
|
"attention_mask": [[] for _ in range(num_example)], |
|
"labels": [[] for _ in range(num_example)], |
|
} |
|
with CaptureLogger(tok_logger) as cl: |
|
for column_name in tokenized_column_order: |
|
encoding = self.tokenizer( |
|
examples[column_name], |
|
add_special_tokens=add_special_tokens, |
|
truncation=True if model_args.use_lora else None, |
|
) |
|
|
|
if column_name in label_columns: |
|
labels = encoding["input_ids"].copy() |
|
else: |
|
labels = [ |
|
[-100] * len(encoding["input_ids"][i]) |
|
for i in range(num_example) |
|
] |
|
|
|
for i in range(num_example): |
|
token_dict["input_ids"][i].extend( |
|
encoding["input_ids"][i] |
|
) |
|
token_dict["attention_mask"][i].extend( |
|
encoding["attention_mask"][i] |
|
) |
|
token_dict["labels"][i].extend(labels[i]) |
|
|
|
|
|
if "Token indices sequence length is longer than the" in cl.out: |
|
tok_logger.warning( |
|
"^^^^^^^^^^^^^^^^ Please ignore the warning above - this long input will be chunked into smaller bits" |
|
" before being passed to the model." |
|
) |
|
return token_dict |
|
|
|
data_args = raw_datasets.get_data_args() |
|
if not data_args.streaming: |
|
tokenized_datasets = raw_datasets.map( |
|
tokenize_function, |
|
batched=True, |
|
num_proc=data_args.preprocessing_num_workers, |
|
remove_columns=column_names, |
|
load_from_cache_file=not data_args.overwrite_cache, |
|
desc="Running tokenizer on dataset", |
|
) |
|
else: |
|
tokenized_datasets = raw_datasets.map( |
|
tokenize_function, |
|
batched=True, |
|
remove_columns=column_names, |
|
) |
|
return tokenized_datasets |
|
|
|
|
|
def encode(self, input: Union[str, List[str]], *args, **kwargs ) -> Union[List[int], List[List[int]]]: |
|
""" |
|
Perform encoding process of the tokenizer. |
|
|
|
Parameters |
|
------------ |
|
inputs : str or list. |
|
The text sequence. |
|
|
|
args : Optional. |
|
Positional arguments. |
|
|
|
kwargs : Optional. |
|
Keyword arguments. |
|
|
|
Returns |
|
------------ |
|
outputs : |
|
The tokenized inputs. |
|
""" |
|
if isinstance(input, list): |
|
output = [] |
|
for single_input in input: |
|
single_output = self.encode(single_input, *args, **kwargs) |
|
output.append(single_output) |
|
return output |
|
elif isinstance(input, str): |
|
return self.tokenizer.encode(text=input, *args, **kwargs) |
|
else: |
|
raise NotImplementedError(f'type "{type(input)}" cannot be encoded') |
|
|
|
|
|
def decode(self, input, *args, **kwargs ) -> Union[str, List[str]]: |
|
""" |
|
Perform decoding process of the tokenizer. |
|
|
|
Parameters |
|
------------ |
|
inputs : list. |
|
The token sequence. |
|
|
|
args : Optional. |
|
Positional arguments. |
|
|
|
kwargs : Optional. |
|
Keyword arguments. |
|
|
|
Returns |
|
------------ |
|
outputs : |
|
The text decoded from the token inputs. |
|
""" |
|
if isinstance(input, list) and input and isinstance(input[0], list): |
|
output = [] |
|
for single_input in input: |
|
single_output = self.decode(single_input, *args, **kwargs) |
|
output.append(single_output) |
|
return output |
|
else: |
|
|
|
return self.tokenizer.decode(input, *args, **kwargs) |
|
|
|
|
|
def inference(self, inputs, *args, **kwargs): |
|
""" |
|
Perform generation process of the model. |
|
|
|
Parameters |
|
------------ |
|
inputs : |
|
The sequence used as a prompt for the generation or as model inputs to the model. |
|
|
|
args : Optional. |
|
Positional arguments. |
|
|
|
kwargs : Optional. |
|
Keyword arguments. |
|
|
|
Returns |
|
------------ |
|
outputs : |
|
The generated sequence output |
|
""" |
|
|
|
|
|
with torch.no_grad(): |
|
if self.device == "gpu": |
|
outputs = self.ds_engine.module.generate( |
|
input_ids=inputs, |
|
synced_gpus=True, |
|
pad_token_id=self.tokenizer.eos_token_id, |
|
*args, |
|
**kwargs |
|
) |
|
elif self.device == "cpu": |
|
outputs = self.backend_model.generate( |
|
input_ids=inputs, |
|
synced_gpus=True, |
|
pad_token_id=self.tokenizer.eos_token_id, |
|
*args, |
|
**kwargs |
|
) |
|
else: |
|
raise NotImplementedError( |
|
f"device \"{self.device}\" is not supported" |
|
) |
|
return outputs |
|
|
|
|
|
def merge_lora_weights(self): |
|
if self.model_args.use_lora: |
|
self.get_backend_model().merge_and_unload() |
|
else: |
|
logger.warning("LoRA training is NOT enabled. Merging LoRA weights is not applicable.") |
|
|
|
|
|
def save(self, dir, save_full_model=False, *args, **kwargs): |
|
""" |
|
Perform generation process of the model. |
|
|
|
Parameters |
|
------------ |
|
dir : |
|
The directory to save model and tokenizer |
|
|
|
save_full_model : Optional. |
|
Whether to save full model. |
|
|
|
kwargs : Optional. |
|
Keyword arguments. |
|
|
|
Returns |
|
------------ |
|
outputs : |
|
The generated sequence output |
|
""" |
|
self.get_tokenizer().save_pretrained(dir) |
|
if save_full_model and self.model_args.use_lora: |
|
self.backend_model_full.save_pretrained(dir) |
|
else: |
|
self.get_backend_model().save_pretrained(dir) |
|
|
|
|
|
def get_max_length(self): |
|
""" |
|
Return max acceptable input length in terms of tokens. |
|
""" |
|
return self.tokenizer.model_max_length |
|
|
|
|
|
def get_tokenizer(self): |
|
""" |
|
Return the tokenizer of the model. |
|
""" |
|
return self.tokenizer |
|
|
|
|
|
def get_backend_model(self): |
|
""" |
|
Return the backend model. |
|
""" |
|
return self.backend_model |
|
|