Spaces:
Runtime error
Runtime error
import numpy as np | |
import torch | |
import torch.nn.functional as F | |
from torch.nn.utils.rnn import pad_sequence | |
from sentence_transformers import SentenceTransformer, CrossEncoder | |
from sentence_transformers.util import pytorch_cos_sim | |
from transformers import AutoTokenizer, AutoModelForMaskedLM, AutoModelForCausalLM | |
from nltk import word_tokenize | |
from collections import defaultdict | |
from pprint import pprint | |
from collections import Counter | |
from rouge_score import rouge_scorer | |
ROUGE_TYPES = ["rouge1", "rouge2", "rougeL"] | |
rouge_scorer = rouge_scorer.RougeScorer( | |
ROUGE_TYPES, | |
use_stemmer=True | |
) | |
def load_rewards(args): | |
rewards, names = [], [] | |
for name, settings in args.rewards.items(): | |
settings["device"] = args.device | |
print("Loading reward:", name) | |
pprint(settings) | |
print() | |
reward_cls = globals()[name] | |
reward_func = reward_cls(**settings) | |
rewards.append(reward_func) | |
names.append(name) | |
return RewardAggregator(rewards, names) | |
class RewardAggregator: | |
def __init__(self, reward_generators, reward_names): | |
self.reward_generators = reward_generators | |
self.reward_names = reward_names | |
self.weights = [rg.weight for rg in reward_generators] | |
self.n_rewards = len(reward_generators) | |
def __call__(self, sources, summaries): | |
name_to_scores = {} | |
for rg, name in zip(self.reward_generators, self.reward_names): | |
scores = rg(sources=sources, summaries=summaries) | |
name_to_scores[name] = scores | |
final_scores = [] | |
for i in range(len(summaries)): | |
score = 0. | |
total_weights = 0. | |
for name, w in zip(self.reward_names, self.weights): | |
score += name_to_scores[name][i] * w | |
total_weights += w | |
score /= total_weights | |
final_scores.append(score) | |
return final_scores, name_to_scores | |
class Fluency: | |
def __init__( | |
self, | |
model_id="distilroberta", | |
weight=1, | |
type="masked", | |
device="cuda", | |
norm="max", | |
max_score=40., | |
min_score=-30., | |
): | |
tokenizer = AutoTokenizer.from_pretrained(model_id) | |
if type == "masked": | |
pad_token_id = tokenizer.pad_token_id | |
model = AutoModelForMaskedLM.from_pretrained(model_id).to(device) | |
else: | |
pad_token_id = tokenizer.eos_token_id | |
model = AutoModelForCausalLM.from_pretrained(model_id).to(device) | |
self.model = model | |
self.tokenizer = tokenizer | |
self.weight = weight | |
self.device = device | |
self.max_score = max_score | |
self.min_score = min_score | |
self.pad_token_id = pad_token_id | |
self.norm = norm | |
assert self.norm in ("max", "minmax") | |
def ids_to_tokens(self, ids): | |
return [self.tokenizer._convert_id_to_token(id) for id in ids] | |
def __call__(self, sources=None, summaries=None, normalize_len=False): | |
summaries = [s if s != "" else " " for s in summaries] # breaks if string is empty | |
input_ids = [self.tokenizer.encode(text) for text in summaries] | |
lens = [len(ids) for ids in input_ids] | |
input_ids = [torch.tensor(ids) for ids in input_ids] | |
input_ids = pad_sequence( | |
input_ids, | |
batch_first=True, | |
padding_value=self.pad_token_id | |
).to(self.device) | |
with torch.no_grad(): | |
output = self.model(input_ids=input_ids, labels=input_ids) | |
logits = output["logits"] | |
scores = [] | |
for i in range(logits.size(0)): | |
i_scores = [] | |
for j in range(logits.size(1)): | |
tok_idx = input_ids[i, j] | |
if tok_idx == self.pad_token_id: | |
break | |
score = logits[i, j, tok_idx].item() | |
i_scores.append(score) | |
i_score_max = np.mean(i_scores) / self.max_score | |
i_score_minmax = (np.mean(i_scores) - self.min_score) / (self.max_score - self.min_score) | |
if self.norm == "max": | |
i_score = i_score_max | |
else: | |
i_score = i_score_minmax | |
scores.append(i_score) | |
return scores | |
class BiEncoderSimilarity: | |
def __init__( | |
self, | |
model_id="all-distilroberta-v1", | |
device="cuda", | |
weight=1 | |
): | |
self.model = SentenceTransformer(model_id).to(device) | |
self.weight = weight | |
def __call__(self, sources=None, summaries=None): | |
src_embs = self.model.encode(sources) | |
sum_embs = self.model.encode(summaries) | |
scores = [] | |
for i in range(len(summaries)): | |
score = pytorch_cos_sim( | |
src_embs[i].reshape(1, -1), | |
sum_embs[i].reshape(1, -1), | |
)[0, 0].item() | |
scores.append(score) | |
return scores | |
class CrossEncoderSimilarity: | |
def __init__( | |
self, | |
model_id="all-distilroberta-v1", | |
device="cuda", | |
weight=1 | |
): | |
self.model = CrossEncoder(model_id, device=device) | |
self.weight = weight | |
def __call__(self, sources=None, summaries=None): | |
scores = self.model.predict([ | |
(src, sum) for src, sum in zip(sources, summaries) | |
]) | |
return scores.tolist() | |
class SelectedTokenSimilarity: | |
def __init__( | |
self, | |
model_id="all-distilroberta-v1", | |
device="cuda", | |
weight=1 | |
): | |
self.model = SentenceTransformer(model_id).to(device) | |
self.weight = weight | |
self.tokenizer = model.tokenizer | |
def ids_to_tokens(self, ids): | |
return [self.tokenizer._convert_id_to_token(id) for id in ids] | |
def align_tokens(self, src, summary): | |
src_ids, sum_ids = self.tokenizer( | |
[src, summary], | |
truncation=True, | |
max_length=self.model.max_seq_length, | |
).input_ids | |
src_tokens = self.ids_to_tokens(src_ids) | |
sum_tokens = self.ids_to_tokens(sum_ids) | |
sum_to_src = defaultdict(list) | |
for i, sum_tok in enumerate(sum_tokens): | |
for j, src_tok in enumerate(src_tokens): | |
if sum_tok == src_tok: | |
sum_to_src[i].append(j) | |
if len(sum_to_src[i]) == 0: | |
sum_to_src[i] = None | |
return sum_to_src | |
def compute_score(self, x_sum, x_src, sum_to_src): | |
S = pytorch_cos_sim(x_sum, x_src).cpu().numpy() | |
scores = [] | |
for i, J in sum_to_src.items(): | |
if J is None: | |
i_score = 0. | |
else: | |
i_scores = [S[i, j] for j in J] | |
i_score = max(i_scores) | |
scores.append(i_score) | |
return np.mean(scores) | |
def __call__(self, sources=None, summaries=None): | |
src_embs = self.model.encode(sources, output_value="token_embeddings") | |
sum_embs = self.model.encode(summaries, output_value="token_embeddings") | |
scores = [] | |
for i in range(len(summaries)): | |
x_src = src_embs[i] | |
x_sum = sum_embs[i] | |
sum_to_src = self.align_tokens(sources[i], summaries[i]) | |
score = self.compute_score(x_sum, x_src, sum_to_src) | |
scores.append(score) | |
return scores | |
class NLIReward(): | |
def __init__( | |
self, | |
model_id="cross-encoder/nli-distilroberta-base", | |
device="cuda", | |
weight=1 | |
): | |
self.model = CrossEncoder(model_id, device) | |
self.label_mapping = ['contradiction', 'entailment', 'neutral'] | |
self.weight = weight | |
def __call__(self, sources=None, summaries=None): | |
scores = self.model.predict([ | |
(src, sum) for src, sum in zip(sources, summaries) | |
]) | |
probs = torch.softmax(torch.tensor(scores), dim=1) | |
labels = [ | |
self.label_mapping[score_max] for score_max in scores.argmax(axis=1) | |
] | |
rewards = [probs[i, 1].item() for i in range(len(summaries))] | |
rewards = [ | |
(0 if summaries[i].strip()=="" else r) | |
for i, r in enumerate(rewards) | |
] | |
return rewards | |
class GaussianLength: | |
def __init__(self, mean=11, std=0.3, max_len=100, weight=1, device=None): | |
self.weight = weight | |
lens = np.arange(0, max_len + 1) | |
scores = gaussian(lens, mean, std) | |
scores /= scores.max() | |
self.len_to_reward = dict((l, scores[l]) for l in lens) | |
self.max_len = max_len | |
def __call__(self, sources=None, summaries=None): | |
lens = [len(word_tokenize(s)) for s in summaries] | |
scores = [ | |
self.len_to_reward[l] if l <= self.max_len else 0. | |
for l in lens | |
] | |
return scores | |
class GaussianCR: | |
def __init__(self, mean=0.45, std=0.3, weight=1, device=None): | |
self.weight = weight | |
ratios = np.arange(0, 1.1, 0.01) | |
scores = gaussian(ratios, mean, std) | |
scores /= scores.max() | |
self.ratio_to_reward = dict((round(r, 3), s) for r, s in zip(ratios, scores)) | |
def __call__(self, sources=None, summaries=None): | |
source_lens = [len(word_tokenize(s)) for s in sources] | |
summary_lens = [len(word_tokenize(s)) for s in summaries] | |
ratios = [round(x / y, 2) for x, y in zip(summary_lens, source_lens)] | |
ratios = [min(1., x) for x in ratios] | |
return [ | |
self.ratio_to_reward[round(ratio, 2)] | |
for ratio in ratios | |
] | |
class NoDaysReward(): | |
def __init__(self, weight=1, device=None): | |
self.day_words = [ | |
"monday", "tuesday", "wednesday", | |
"thursday", "friday", "saturday", "sunday", | |
"today", "tomorrow", "yesterday", "tonight" | |
] | |
self.weight = weight | |
def __call__(self, sources=None, summaries=None): | |
scores = [] | |
for s in summaries: | |
s = s.lower() | |
if any([w in s for w in self.day_words]): | |
score = 0. | |
else: | |
score = 1. | |
scores.append(score) | |
return scores | |
def gaussian(x, mu, sig): | |
return np.exp(-np.power(x - mu, 2.) / (2 * np.power(sig, 2.))) | |
class RougeReward: | |
def __init__(self, rouge_type="rougeL", weight=1, device=None): | |
self.rouge_type = rouge_type | |
self.weight = weight | |
self.targets = None | |
def __call__(self, sources=None, summaries=None): | |
scores = [] | |
for pred, tgt in zip(summaries, self.targets): | |
rouge_scores = rouge_scorer.score(tgt, pred) | |
score = rouge_scores[self.rouge_type].fmeasure | |
scores.append(score) | |
return scores | |
# | |