|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
A script for processing a dataset such that chat templates are utilized in the creation of the data. |
|
These are then used to perform instruction/chat model finetunes (for example, finetuning a model on only the assistant |
|
portions of a chatml dataset). |
|
|
|
This follows the same output format as 'preprocess_data_with_mask.py' but using chat templates to generate the data. |
|
This way we can support multiturn chat data in the finetuning process. instead of relying on a single turn of data. |
|
|
|
To run this script, first edit `tools/datasets/corpora.py` such that the command to call |
|
`tools/datasets/preprocess_data_with_chat_template.py` is as follows: |
|
|
|
``` |
|
cmd = f"python tools/datasets/preprocess_data_with_with_chat_template.py \ |
|
--input {jsonl_filepath} \ |
|
--output-prefix {parent_folder}/{self.name} \ |
|
--tokenizer-path {hf-tokenizer} \ |
|
--jsonl-keys {jsonl_keys} \ |
|
--dataset-impl mmap \ |
|
--workers {self.num_workers} " |
|
|
|
if self.only_last: |
|
cmd += f"--only-last " |
|
|
|
if self.no_mask: |
|
cmd += f"--no-mask " |
|
``` |
|
|
|
Then, specify |
|
``` |
|
"train_data_paths": ["/path/to/dataset/name_text_document"], |
|
"label_data_paths": ["/path/to/dataset/name_label_document"] |
|
``` |
|
in your YML config. This will then allow for finetuning on the data with loss masks set appropriately. |
|
|
|
""" |
|
|
|
import argparse |
|
import multiprocessing |
|
import os |
|
import sys |
|
|
|
import lm_dataformat as lmd |
|
import numpy as np |
|
|
|
sys.path.append( |
|
os.path.abspath( |
|
os.path.join(os.path.dirname(__file__), os.path.pardir, os.path.pardir) |
|
) |
|
) |
|
|
|
import time |
|
import tqdm |
|
import jsonlines |
|
|
|
from megatron.data import indexed_dataset |
|
from threading import Semaphore |
|
from typing import List, Dict, Tuple |
|
from transformers import AutoTokenizer, PreTrainedTokenizer |
|
|
|
|
|
def build_chat( |
|
chat: List[Dict[str, str]], |
|
generation_role: str, |
|
apply_mask: bool, |
|
tokenizer: PreTrainedTokenizer, |
|
only_last_turn: bool = False, |
|
for_rm: bool = False, |
|
) -> Tuple[List[int], List[int]]: |
|
""" |
|
Build a chat from a list of dictionaries. Each dictionary should have a "role" and "content" key, this follows the |
|
Chat Template from https://huggingface.co/docs/transformers/main/en/chat_templating |
|
|
|
:param chat: A list of dictionaries with "role" and "content" keys |
|
:param generation_role: The role of the model generating the chat, usually "assistant" |
|
:param apply_mask: Whether to apply a loss mask to the chat, if False, all tokens will be included in the loss |
|
:param tokenizer: A HF tokenizer |
|
:param only_last_turn: Whether to only include the last turn in the chat, needed for some fine-tuning tasks |
|
""" |
|
tokens = [] |
|
mask = [] |
|
if apply_mask is False: |
|
tokens = tokenizer.apply_chat_template(chat) |
|
mask = tokens |
|
return tokens, mask |
|
elif for_rm: |
|
tokens = tokenizer.apply_chat_template(chat) |
|
mask = [-100] * len(tokens) |
|
if tokenizer.eos_token_id is not None: |
|
|
|
mask.append(-100) |
|
tokens.append(tokenizer.eos_token_id) |
|
mask.append(tokenizer.eos_token_id) |
|
tokens.append(tokenizer.eos_token_id) |
|
else: |
|
raise ValueError( |
|
"Tokenizer does not have an EOS token, unable to determine good mask, please edit and make your own." |
|
) |
|
return tokens, mask |
|
for i, turn in enumerate(chat): |
|
add_gen = ( |
|
False if i == len(chat) - 1 else chat[i + 1]["role"] == generation_role |
|
) |
|
chat_tokens = tokenizer.apply_chat_template( |
|
chat[: i + 1], add_generation_prompt=add_gen |
|
)[len(tokens) :] |
|
|
|
tokens.extend(chat_tokens) |
|
if only_last_turn and (i != len(chat) - 1): |
|
mask.extend([-100] * len(chat_tokens)) |
|
elif apply_mask and (turn["role"] != generation_role): |
|
mask.extend([-100] * len(chat_tokens)) |
|
else: |
|
mask.extend(chat_tokens) |
|
if tokenizer.eos_token_id is not None: |
|
mask.append(tokenizer.eos_token_id if mask[-1] != -100 else -100) |
|
tokens.append(tokenizer.eos_token_id) |
|
return tokens, mask |
|
|
|
|
|
class Encoder(object): |
|
def __init__(self, args): |
|
self.args = args |
|
|
|
def initializer(self): |
|
|
|
Encoder.tokenizer = AutoTokenizer.from_pretrained(self.args.tokenizer_path) |
|
|
|
def encode(self, text): |
|
ids = {} |
|
for key in self.args.jsonl_keys: |
|
text_ids, label_ids = build_chat( |
|
text[key], |
|
self.args.generation_role, |
|
not self.args.no_mask, |
|
Encoder.tokenizer, |
|
self.args.only_last, |
|
self.args.for_rm, |
|
) |
|
if self.args.reward_key is not None: |
|
reward = text[self.args.reward_key] |
|
if self.args.binary_reward: |
|
reward = [1] if reward else [-1] |
|
elif type(reward) == float: |
|
reward = [reward] |
|
ids[key] = (text_ids, label_ids, reward) |
|
else: |
|
ids[key] = (text_ids, label_ids, None) |
|
return ids, len(text) |
|
|
|
|
|
def get_args(): |
|
parser = argparse.ArgumentParser() |
|
group = parser.add_argument_group(title="input data") |
|
group.add_argument( |
|
"--input", |
|
type=str, |
|
required=True, |
|
help="Path to input jsonl files or lmd archive(s) - if using multiple archives, put them in a comma separated " |
|
"list", |
|
) |
|
group.add_argument( |
|
"--jsonl-keys", |
|
nargs="+", |
|
default=["conversation"], |
|
help="space separate listed of keys to extract from jsonl. Default: text", |
|
) |
|
group.add_argument( |
|
"--no-mask", |
|
help="If set, this will not mask any tokens in the input data.", |
|
action="store_true", |
|
) |
|
group.add_argument( |
|
"--for-rm", |
|
help="If set, this will mask everything except the last token in the chat.", |
|
action="store_true", |
|
) |
|
|
|
group.add_argument( |
|
"--generation-role", |
|
type=str, |
|
default="assistant", |
|
help="The role of the model generating the chat, usually 'assistant'. Default: assistant", |
|
) |
|
group.add_argument( |
|
"--only-last", |
|
help="If set, this will mask everything except the last turn in the chat.", |
|
action="store_true", |
|
) |
|
group.add_argument( |
|
"--reward-key", |
|
type=str, |
|
default=None, |
|
help="Optional: key to use for reward data in the input data.", |
|
) |
|
group.add_argument( |
|
"--binary-reward", |
|
help="If set, this will treat the reward data as a boolean.", |
|
action="store_true", |
|
) |
|
group.add_argument( |
|
"--num-docs", |
|
default=None, |
|
help="Optional: Number of documents in the input data (if known) for an accurate progress bar.", |
|
type=int, |
|
) |
|
group = parser.add_argument_group(title="tokenizer") |
|
group.add_argument( |
|
"--tokenizer-path", |
|
type=str, |
|
required=True, |
|
help="Path to HF Tokenizer.", |
|
) |
|
group.add_argument("--ftfy", action="store_true", help="Use ftfy to clean text") |
|
group = parser.add_argument_group(title="output data") |
|
group.add_argument( |
|
"--output-prefix", |
|
type=str, |
|
required=True, |
|
help="Path to binary output file without suffix", |
|
) |
|
group.add_argument( |
|
"--dataset-impl", |
|
type=str, |
|
default="mmap", |
|
choices=["lazy", "cached", "mmap"], |
|
help="Dataset implementation to use. Default: mmap", |
|
) |
|
|
|
group = parser.add_argument_group(title="runtime") |
|
group.add_argument( |
|
"--workers", type=int, default=1, help="Number of worker processes to launch" |
|
) |
|
group.add_argument( |
|
"--log-interval", |
|
type=int, |
|
default=100, |
|
help="Interval between progress updates", |
|
) |
|
args = parser.parse_args() |
|
args.keep_empty = False |
|
|
|
|
|
args.rank = 0 |
|
args.make_vocab_size_divisible_by = 128 |
|
args.model_parallel_size = 1 |
|
|
|
return args |
|
|
|
|
|
def yield_from_files(fnames: list, semaphore): |
|
""" |
|
Iterator over input documents using lm_dataformat. Should be able to handle jsons / texts / |
|
other compressed formats. Also filters out empty documents. |
|
|
|
:param fnames: list of filenames |
|
""" |
|
|
|
def yielder(fname, semaphore): |
|
with open(fname, encoding="utf-8") as f: |
|
reader = jsonlines.Reader(f) |
|
for f in reader: |
|
semaphore.acquire() |
|
yield f |
|
|
|
for fname in fnames: |
|
semaphore.acquire() |
|
|
|
yield from yielder(fname, semaphore) |
|
|
|
|
|
def main(): |
|
args = get_args() |
|
encoder = Encoder(args) |
|
tokenizer = AutoTokenizer.from_pretrained(args.tokenizer_path) |
|
print(f"Vocab size: {tokenizer.vocab_size}") |
|
print(f"Output prefix: {args.output_prefix}") |
|
|
|
|
|
|
|
semaphore = Semaphore(10000 + args.workers) |
|
|
|
|
|
fin = yield_from_files(args.input.split(","), semaphore) |
|
|
|
if args.workers > 1: |
|
pool = multiprocessing.Pool(args.workers, initializer=encoder.initializer) |
|
encoded_docs = pool.imap(encoder.encode, fin, chunksize=25) |
|
else: |
|
encoder.initializer() |
|
encoded_docs = (encoder.encode(doc) for doc in fin) |
|
|
|
|
|
|
|
output_bin_files = {} |
|
output_idx_files = {} |
|
builders = {} |
|
for key in args.jsonl_keys: |
|
output_bin_files[key] = "{}_{}_{}.bin".format( |
|
args.output_prefix, key, "document" |
|
) |
|
output_idx_files[key] = "{}_{}_{}.idx".format( |
|
args.output_prefix, key, "document" |
|
) |
|
builders[key] = indexed_dataset.make_builder( |
|
output_bin_files[key], |
|
impl=args.dataset_impl, |
|
vocab_size=tokenizer.vocab_size, |
|
) |
|
builders[key]._dtype = np.int32 |
|
if not args.no_mask: |
|
assert ( |
|
key + "_label" not in args.jsonl_keys |
|
), "label should not be included as it will be generated according to the mask." |
|
label_key = key + "_label" |
|
output_bin_files[label_key] = "{}_{}_{}.bin".format( |
|
args.output_prefix, label_key, "document" |
|
) |
|
output_idx_files[label_key] = "{}_{}_{}.idx".format( |
|
args.output_prefix, label_key, "document" |
|
) |
|
builders[label_key] = indexed_dataset.make_builder( |
|
output_bin_files[label_key], |
|
impl=args.dataset_impl, |
|
vocab_size=tokenizer.vocab_size, |
|
) |
|
builders[label_key]._dtype = np.int32 |
|
if args.reward_key is not None: |
|
assert ( |
|
key + "_reward" not in args.jsonl_keys |
|
), "reward should not be included as it will be generated from the data." |
|
reward_key = key + "_reward" |
|
output_bin_files[reward_key] = "{}_{}_{}.bin".format( |
|
args.output_prefix, reward_key, "document" |
|
) |
|
output_idx_files[reward_key] = "{}_{}_{}.idx".format( |
|
args.output_prefix, reward_key, "document" |
|
) |
|
builders[reward_key] = indexed_dataset.make_builder( |
|
output_bin_files[reward_key], |
|
impl=args.dataset_impl, |
|
vocab_size=tokenizer.vocab_size, |
|
) |
|
builders[reward_key]._dtype = np.int32 |
|
|
|
|
|
proc_start = time.time() |
|
total_bytes_processed = 0 |
|
pbar = tqdm.tqdm() |
|
for i, (doc, bytes_processed) in enumerate(encoded_docs, start=1): |
|
total_bytes_processed += bytes_processed |
|
|
|
|
|
semaphore.release() |
|
|
|
|
|
for key, conv in doc.items(): |
|
tokens = conv[0] |
|
token_mask = conv[1] |
|
reward = conv[2] |
|
builders[key].add_item(np.array(tokens, dtype=builders[key].dtype)) |
|
builders[key + "_label"].add_item( |
|
np.array(token_mask, dtype=builders[key + "_label"].dtype) |
|
) |
|
if args.reward_key is not None: |
|
builders[key + "_reward"].add_item( |
|
np.array(reward, dtype=builders[key + "_reward"].dtype) |
|
) |
|
|
|
builders[key].end_document() |
|
builders[key + "_label"].end_document() |
|
if args.reward_key is not None: |
|
builders[key + "_reward"].end_document() |
|
if i == 1: |
|
print("key: ", key) |
|
print("tokens: ", tokens) |
|
print("token_mask: ", token_mask) |
|
print("Reward: ", reward) |
|
|
|
if i % args.log_interval == 0: |
|
current = time.time() |
|
elapsed = current - proc_start |
|
mbs = total_bytes_processed / elapsed / 1024 / 1024 |
|
pbar.set_description( |
|
f"Processed {i}{'' if args.num_docs is None else '/' + str(args.num_docs)} documents ({i / elapsed} docs/s, {mbs} MB/s)." |
|
) |
|
if i != 0: |
|
pbar.update(args.log_interval) |
|
|
|
|
|
update_keys = args.jsonl_keys |
|
for key in update_keys: |
|
builders[key].finalize(output_idx_files[key]) |
|
builders[key + "_label"].finalize(output_idx_files[key + "_label"]) |
|
if args.reward_key is not None: |
|
builders[key + "_reward"].finalize(output_idx_files[key + "_reward"]) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|