JerryLiJinyi's picture
Upload 127 files
10b912d verified
raw
history blame
10.8 kB
import numpy as np
import torch
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
from sentence_transformers import SentenceTransformer, CrossEncoder
from sentence_transformers.util import pytorch_cos_sim
from transformers import AutoTokenizer, AutoModelForMaskedLM, AutoModelForCausalLM
from nltk import word_tokenize
from collections import defaultdict
from pprint import pprint
from collections import Counter
from rouge_score import rouge_scorer
ROUGE_TYPES = ["rouge1", "rouge2", "rougeL"]
rouge_scorer = rouge_scorer.RougeScorer(
ROUGE_TYPES,
use_stemmer=True
)
def load_rewards(args):
rewards, names = [], []
for name, settings in args.rewards.items():
settings["device"] = args.device
print("Loading reward:", name)
pprint(settings)
print()
reward_cls = globals()[name]
reward_func = reward_cls(**settings)
rewards.append(reward_func)
names.append(name)
return RewardAggregator(rewards, names)
class RewardAggregator:
def __init__(self, reward_generators, reward_names):
self.reward_generators = reward_generators
self.reward_names = reward_names
self.weights = [rg.weight for rg in reward_generators]
self.n_rewards = len(reward_generators)
def __call__(self, sources, summaries):
name_to_scores = {}
for rg, name in zip(self.reward_generators, self.reward_names):
scores = rg(sources=sources, summaries=summaries)
name_to_scores[name] = scores
final_scores = []
for i in range(len(summaries)):
score = 0.
total_weights = 0.
for name, w in zip(self.reward_names, self.weights):
score += name_to_scores[name][i] * w
total_weights += w
score /= total_weights
final_scores.append(score)
return final_scores, name_to_scores
class Fluency:
def __init__(
self,
model_id="distilroberta",
weight=1,
type="masked",
device="cuda",
norm="max",
max_score=40.,
min_score=-30.,
):
tokenizer = AutoTokenizer.from_pretrained(model_id)
if type == "masked":
pad_token_id = tokenizer.pad_token_id
model = AutoModelForMaskedLM.from_pretrained(model_id).to(device)
else:
pad_token_id = tokenizer.eos_token_id
model = AutoModelForCausalLM.from_pretrained(model_id).to(device)
self.model = model
self.tokenizer = tokenizer
self.weight = weight
self.device = device
self.max_score = max_score
self.min_score = min_score
self.pad_token_id = pad_token_id
self.norm = norm
assert self.norm in ("max", "minmax")
def ids_to_tokens(self, ids):
return [self.tokenizer._convert_id_to_token(id) for id in ids]
def __call__(self, sources=None, summaries=None, normalize_len=False):
summaries = [s if s != "" else " " for s in summaries] # breaks if string is empty
input_ids = [self.tokenizer.encode(text) for text in summaries]
lens = [len(ids) for ids in input_ids]
input_ids = [torch.tensor(ids) for ids in input_ids]
input_ids = pad_sequence(
input_ids,
batch_first=True,
padding_value=self.pad_token_id
).to(self.device)
with torch.no_grad():
output = self.model(input_ids=input_ids, labels=input_ids)
logits = output["logits"]
scores = []
for i in range(logits.size(0)):
i_scores = []
for j in range(logits.size(1)):
tok_idx = input_ids[i, j]
if tok_idx == self.pad_token_id:
break
score = logits[i, j, tok_idx].item()
i_scores.append(score)
i_score_max = np.mean(i_scores) / self.max_score
i_score_minmax = (np.mean(i_scores) - self.min_score) / (self.max_score - self.min_score)
if self.norm == "max":
i_score = i_score_max
else:
i_score = i_score_minmax
scores.append(i_score)
return scores
class BiEncoderSimilarity:
def __init__(
self,
model_id="all-distilroberta-v1",
device="cuda",
weight=1
):
self.model = SentenceTransformer(model_id).to(device)
self.weight = weight
def __call__(self, sources=None, summaries=None):
src_embs = self.model.encode(sources)
sum_embs = self.model.encode(summaries)
scores = []
for i in range(len(summaries)):
score = pytorch_cos_sim(
src_embs[i].reshape(1, -1),
sum_embs[i].reshape(1, -1),
)[0, 0].item()
scores.append(score)
return scores
class CrossEncoderSimilarity:
def __init__(
self,
model_id="all-distilroberta-v1",
device="cuda",
weight=1
):
self.model = CrossEncoder(model_id, device=device)
self.weight = weight
def __call__(self, sources=None, summaries=None):
scores = self.model.predict([
(src, sum) for src, sum in zip(sources, summaries)
])
return scores.tolist()
class SelectedTokenSimilarity:
def __init__(
self,
model_id="all-distilroberta-v1",
device="cuda",
weight=1
):
self.model = SentenceTransformer(model_id).to(device)
self.weight = weight
self.tokenizer = model.tokenizer
def ids_to_tokens(self, ids):
return [self.tokenizer._convert_id_to_token(id) for id in ids]
def align_tokens(self, src, summary):
src_ids, sum_ids = self.tokenizer(
[src, summary],
truncation=True,
max_length=self.model.max_seq_length,
).input_ids
src_tokens = self.ids_to_tokens(src_ids)
sum_tokens = self.ids_to_tokens(sum_ids)
sum_to_src = defaultdict(list)
for i, sum_tok in enumerate(sum_tokens):
for j, src_tok in enumerate(src_tokens):
if sum_tok == src_tok:
sum_to_src[i].append(j)
if len(sum_to_src[i]) == 0:
sum_to_src[i] = None
return sum_to_src
def compute_score(self, x_sum, x_src, sum_to_src):
S = pytorch_cos_sim(x_sum, x_src).cpu().numpy()
scores = []
for i, J in sum_to_src.items():
if J is None:
i_score = 0.
else:
i_scores = [S[i, j] for j in J]
i_score = max(i_scores)
scores.append(i_score)
return np.mean(scores)
def __call__(self, sources=None, summaries=None):
src_embs = self.model.encode(sources, output_value="token_embeddings")
sum_embs = self.model.encode(summaries, output_value="token_embeddings")
scores = []
for i in range(len(summaries)):
x_src = src_embs[i]
x_sum = sum_embs[i]
sum_to_src = self.align_tokens(sources[i], summaries[i])
score = self.compute_score(x_sum, x_src, sum_to_src)
scores.append(score)
return scores
class NLIReward():
def __init__(
self,
model_id="cross-encoder/nli-distilroberta-base",
device="cuda",
weight=1
):
self.model = CrossEncoder(model_id, device)
self.label_mapping = ['contradiction', 'entailment', 'neutral']
self.weight = weight
def __call__(self, sources=None, summaries=None):
scores = self.model.predict([
(src, sum) for src, sum in zip(sources, summaries)
])
probs = torch.softmax(torch.tensor(scores), dim=1)
labels = [
self.label_mapping[score_max] for score_max in scores.argmax(axis=1)
]
rewards = [probs[i, 1].item() for i in range(len(summaries))]
rewards = [
(0 if summaries[i].strip()=="" else r)
for i, r in enumerate(rewards)
]
return rewards
class GaussianLength:
def __init__(self, mean=11, std=0.3, max_len=100, weight=1, device=None):
self.weight = weight
lens = np.arange(0, max_len + 1)
scores = gaussian(lens, mean, std)
scores /= scores.max()
self.len_to_reward = dict((l, scores[l]) for l in lens)
self.max_len = max_len
def __call__(self, sources=None, summaries=None):
lens = [len(word_tokenize(s)) for s in summaries]
scores = [
self.len_to_reward[l] if l <= self.max_len else 0.
for l in lens
]
return scores
class GaussianCR:
def __init__(self, mean=0.45, std=0.3, weight=1, device=None):
self.weight = weight
ratios = np.arange(0, 1.1, 0.01)
scores = gaussian(ratios, mean, std)
scores /= scores.max()
self.ratio_to_reward = dict((round(r, 3), s) for r, s in zip(ratios, scores))
def __call__(self, sources=None, summaries=None):
source_lens = [len(word_tokenize(s)) for s in sources]
summary_lens = [len(word_tokenize(s)) for s in summaries]
ratios = [round(x / y, 2) for x, y in zip(summary_lens, source_lens)]
ratios = [min(1., x) for x in ratios]
return [
self.ratio_to_reward[round(ratio, 2)]
for ratio in ratios
]
class NoDaysReward():
def __init__(self, weight=1, device=None):
self.day_words = [
"monday", "tuesday", "wednesday",
"thursday", "friday", "saturday", "sunday",
"today", "tomorrow", "yesterday", "tonight"
]
self.weight = weight
def __call__(self, sources=None, summaries=None):
scores = []
for s in summaries:
s = s.lower()
if any([w in s for w in self.day_words]):
score = 0.
else:
score = 1.
scores.append(score)
return scores
def gaussian(x, mu, sig):
return np.exp(-np.power(x - mu, 2.) / (2 * np.power(sig, 2.)))
class RougeReward:
def __init__(self, rouge_type="rougeL", weight=1, device=None):
self.rouge_type = rouge_type
self.weight = weight
self.targets = None
def __call__(self, sources=None, summaries=None):
scores = []
for pred, tgt in zip(summaries, self.targets):
rouge_scores = rouge_scorer.score(tgt, pred)
score = rouge_scores[self.rouge_type].fmeasure
scores.append(score)
return scores
#