import pandas as pd
import numpy as np
import os
import torch
from transformers import pipeline
import streamlit as st
import plotly.express as px
import plotly.figure_factory as ff
from captum.attr import LayerIntegratedGradients, TokenReferenceBase, visualization
from captum.attr import visualization as viz
from captum import attr
from captum.attr._utils.visualization import format_word_importances, format_special_tokens, _get_color
os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python"
def results_to_df(results: dict, metric_name: str):
metric_scores = []
for topic, results_dict in results.items():
for metric_name_cur, metric_value in results_dict.items():
if metric_name == metric_name_cur:
metric_scores.append(metric_value)
return pd.DataFrame({metric_name: metric_scores})
def create_boxplot_1df(results: dict, metric_name: str):
df = results_to_df(results, metric_name)
fig = px.box(df, y=metric_name)
return fig
def create_boxplot_2df(results1, results2, metric_name):
df1 = results_to_df(results1, metric_name)
df2 = results_to_df(results2, metric_name)
df2["Run"] = "Run 2"
df1["Run"] = "Run 1"
df = pd.concat([df1, df2])
# Create distplot with custom bin_size
fig = px.histogram(df, x=metric_name, color="Run", marginal="box", hover_data=df.columns)
return fig
def create_boxplot_diff(results1, results2, metric_name):
df1 = results_to_df(results1, metric_name)
df2 = results_to_df(results2, metric_name)
diff = df1[metric_name] - df2[metric_name]
x_axis = f"Difference in {metric_name} from 1 to 2"
fig = px.histogram(pd.DataFrame({x_axis: diff}), x=x_axis, marginal="box")
return fig
def summarize_attributions(attributions):
attributions = attributions.sum(dim=-1).squeeze(0)
attributions = attributions / torch.norm(attributions)
return attributions
def get_words(words, importances):
words_colored = []
for word, importance in zip(words, importances[: len(words)]):
word = format_special_tokens(word)
color = _get_color(importance)
unwrapped_tag = '{word}'.format(
color=color, word=word
)
words_colored.append(unwrapped_tag)
return words_colored
@st.cache_resource
def get_model(model_name: str):
if "MonoT5" in model_name:
if model_name == "MonoT5-Small":
pipe = pipeline('text2text-generation',
model='castorini/monot5-small-msmarco-10k',
tokenizer='castorini/monot5-small-msmarco-10k',
device='cpu')
elif model_name == "MonoT5-3B":
pipe = pipeline('text2text-generation',
model='castorini/monot5-3b-msmarco-10k',
tokenizer='castorini/monot5-3b-msmarco-10k',
device='cpu')
def formatter(query, doc):
return f"Query: {query} Document: {doc} Relevant:"
return pipe, formatter
def prep_func(pipe, formatter):
# variables that only need to be run once
decoder_input_ids = pipe.tokenizer([""], return_tensors="pt", add_special_tokens=False, truncation=True).input_ids.to('cpu')
decoder_embedding_layer = pipe.model.base_model.decoder.embed_tokens
decoder_inputs_emb = decoder_embedding_layer(decoder_input_ids)
token_false_id = pipe.tokenizer.get_vocab()['▁false']
token_true_id = pipe.tokenizer.get_vocab()["▁true"]
# this function needs to be run for each combination
@st.cache_data
def get_saliency(query, doc):
input_ids = pipe.tokenizer(
[formatter(query, doc)],
padding=False,
truncation=True,
return_tensors="pt",
max_length=pipe.tokenizer.model_max_length,
)["input_ids"].to('cpu')
embedding_layer = pipe.model.base_model.encoder.embed_tokens
inputs_emb = embedding_layer(input_ids)
def forward_from_embeddings(inputs_embeds, decoder_inputs_embeds):
logits = pipe.model.forward(inputs_embeds=inputs_embeds, decoder_inputs_embeds=decoder_inputs_embeds)['logits'][:, -1, :]
batch_scores = logits[:, [token_false_id, token_true_id]]
batch_scores = torch.nn.functional.log_softmax(batch_scores, dim=1)
scores = batch_scores[:, 1].exp() # relevant token
return scores
lig = attr.Saliency(forward_from_embeddings)
attributions_ig, delta = lig.attribute(
inputs=(inputs_emb, decoder_inputs_emb)
)
attributions_normed = summarize_attributions(attributions_ig)
return "\n".join(get_words(pipe.tokenizer.convert_ids_to_tokens(input_ids.squeeze(0).tolist()), attributions_normed))
return get_saliency
if __name__ == "__main__":
query = "how to add dll to visual studio?"
doc = "StackOverflow In the days of 16-bit Windows, a WPARAM was a 16-bit word, while LPARAM was a 32-bit long. These distinctions went away in Win32; they both became 32-bit values. ... WPARAM is defined as UINT_PTR , which in 64-bit Windows is an unsigned, 64-bit value."
model, formatter = get_model("MonoT5")
get_saliency = prep_func(model, formatter)
print(get_saliency(query, doc))