import streamlit as st
import spacy
import torch
import torch.nn as nn
import pandas as pd
from transformers import BertTokenizer, BertModel, AutoConfig
from transformers.models.bert.modeling_bert import BertForMaskedLM
from models.spabert.models.spatial_bert_model import SpatialBertConfig, SpatialBertForMaskedLM, SpatialBertModel
from models.spabert.utils.common_utils import load_spatial_bert_pretrained_weights
from models.spabert.datasets.osm_sample_loader import PbfMapDataset
from torch.utils.data import DataLoader
from PIL import Image
device = torch.device('cpu')
dev_mode = True
#Spacy Initialization Section
nlp = spacy.load("./models/en_core_web_sm")
#BERT Initialization Section
bert_tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
bert_model = BertModel.from_pretrained("bert-base-uncased")
bert_model.to(device)
bert_model.eval()
#SpaBERT Initialization Section
data_file_path = 'models/spabert/datasets/SpaBERTPivots.json' #Sample file otherwise this model will take too long on CPU.
pretrained_model_path = 'models/spabert/datasets/fine-spabert-base-uncased-finetuned-osm-mn.pth'
config = SpatialBertConfig()
config.output_hidden_states = True
spaBERT_model = SpatialBertForMaskedLM(config)
pre_trained_model = torch.load(pretrained_model_path, map_location=torch.device('cpu'))
spaBERT_model.load_state_dict(bert_model.state_dict(), strict = False)
spaBERT_model.load_state_dict(pre_trained_model, strict=False)
spaBERT_model.to(device)
spaBERT_model.eval()
#Load data using SpatialDataset
spatialDataset = PbfMapDataset(data_file_path = data_file_path,
tokenizer = bert_tokenizer,
max_token_len = 256, #Originally 300
#max_token_len = max_seq_length, #Originally 300
distance_norm_factor = 0.0001,
spatial_dist_fill = 20,
with_type = False,
sep_between_neighbors = True,
label_encoder = None,
mode = None) #If set to None it will use the full dataset for mlm
data_loader = DataLoader(spatialDataset, batch_size=1, num_workers=0, shuffle=False, pin_memory=False, drop_last=False)
# Create a dictionary to map entity names to indices
entity_index_dict = {entity['pivot_name']: i for i, entity in enumerate(spatialDataset)}
# Ensure names are stored in lowercase for case-insensitive matching
entity_index_dict = {name.lower(): index for name, index in entity_index_dict.items()}
#Pre-aquire the SpaBERT embeddings for all geo-entities within our dataset
def process_entity(batch, model, device):
input_ids = batch['masked_input'].to(device)
attention_mask = batch['attention_mask'].to(device)
position_list_x = batch['norm_lng_list'].to(device)
position_list_y = batch['norm_lat_list'].to(device)
sent_position_ids = batch['sent_position_ids'].to(device)
pseudo_sentence = batch['pseudo_sentence'].to(device)
# Convert tensor to list of token IDs, and decode them into a readable sentence
pseudo_sentence_decoded = bert_tokenizer.decode(pseudo_sentence[0].tolist(), skip_special_tokens=False)
with torch.no_grad():
outputs = spaBERT_model(#input_ids=input_ids,
input_ids=pseudo_sentence,
attention_mask=attention_mask,
sent_position_ids=sent_position_ids,
position_list_x=position_list_x,
position_list_y=position_list_y)
#NOTE: we are ommitting the pseudo_sentence here. Verify that this is correct
spaBERT_embedding = outputs.hidden_states[-1].to(device)
# Extract the [CLS] token embedding (first token)
spaBERT_embedding = spaBERT_embedding[:, 0, :].detach() # [batch_size, hidden_size]
#return pivot_embeddings.cpu().numpy(), input_ids.cpu().numpy()
return spaBERT_embedding, input_ids, pseudo_sentence_decoded
spaBERT_embeddings = []
pseudo_sentences = []
for batch in (data_loader):
spaBERT_embedding, input_ids, pseudo_sentence = process_entity(batch, spaBERT_model, device)
spaBERT_embeddings.append(spaBERT_embedding)
pseudo_sentences.append(pseudo_sentence)
embedding_cache = {}
#Get BERT Embedding for review
def get_bert_embedding(review_text):
#tokenize review
inputs = bert_tokenizer(review_text, return_tensors='pt', padding=True, truncation=True).to(device)
# Forward pass through the BERT model
with torch.no_grad():
outputs = bert_model(**inputs)
# Extract embeddings from the last hidden state
bert_embedding = outputs.last_hidden_state[:, 0, :].detach() #CLS Token
return bert_embedding
#Get SpaBERT Embedding for geo-entity
def get_spaBert_embedding(entity,current_pseudo_sentences):
entity_index = entity_index_dict.get(entity.lower(), None)
if entity_index is None:
if(dev_mode == True):
st.write("Got Bert embedding for: ", entity)
return get_bert_embedding(entity) #Fallback in-case SpaBERT could not resolve entity to retrieve embedding. Rare-cases only.
else:
current_pseudo_sentences.append(pseudo_sentences[entity_index])
if(dev_mode == True):
st.write("Got SpaBert embedding for: ", entity)
return spaBERT_embeddings[entity_index]
#Go through each review, identify all geo-entities, then extract their SpaBERT embedings
def processSpatialEntities(review, nlp):
doc = nlp(review)
entity_spans = [(ent.start, ent.end, ent.text, ent.label_) for ent in doc.ents]
token_embeddings = []
current_pseudo_sentences = []
# Iterate over each entity span and process only geo entities
for start, end, text, label in entity_spans:
if label in ['FAC', 'ORG', 'LOC', 'GPE']: # Filter to geo-entities
if(dev_mode == True):
st.write("Text found:", text)
spaBert_emb = get_spaBert_embedding(text,current_pseudo_sentences)
token_embeddings.append(spaBert_emb)
if(dev_mode == True):
st.write("Geo-Entity Found in review: ", text)
token_embeddings = torch.stack(token_embeddings, dim=0)
processed_embedding = token_embeddings.mean(dim=0) # Shape: (768)
#processed_embedding = processed_embedding.unsqueeze(0) # Shape: (1, 768)
return processed_embedding,current_pseudo_sentences
#Initialize discriminator module
class Discriminator(nn.Module):
def __init__(self, input_size=512, hidden_sizes=[512], num_labels=2, dropout_rate=0.1):
super(Discriminator, self).__init__()
self.input_dropout = nn.Dropout(p=dropout_rate)
layers = []
hidden_sizes = [input_size] + hidden_sizes
for i in range(len(hidden_sizes)-1):
layers.extend([nn.Linear(hidden_sizes[i], hidden_sizes[i+1]), nn.LeakyReLU(0.2, inplace=True), nn.Dropout(dropout_rate)])
self.layers = nn.Sequential(*layers) #per il flatten
self.logit = nn.Linear(hidden_sizes[-1],num_labels+1) # +1 for the probability of this sample being fake/real.
self.softmax = nn.Softmax(dim=-1)
def forward(self, input_rep):
input_rep = self.input_dropout(input_rep)
last_rep = self.layers(input_rep)
logits = self.logit(last_rep)
probs = self.softmax(logits)
return last_rep, logits, probs
dConfig = AutoConfig.from_pretrained("bert-base-uncased")
hidden_size = int(dConfig.hidden_size)
num_hidden_layers_d = 2;
hidden_levels_d = [hidden_size for i in range(0, num_hidden_layers_d)]
label_list = ["1", "0"]
label_list.append('UNL')
out_dropout_rate = 0.5;
discriminator = Discriminator(input_size=hidden_size*2, hidden_sizes=hidden_levels_d, num_labels=len(label_list), dropout_rate=out_dropout_rate).to(device)
discriminator_weights = ('data/datasets/discriminator_weights.pth')
discriminator.load_state_dict(torch.load(discriminator_weights,map_location=torch.device('cpu')))
discriminator.eval()
def get_prediction(embeddings):
with torch.no_grad():
# Forward pass through the discriminator to get the logits and probabilities
last_rep, logits, probs = discriminator(embeddings)
# Filter logits to ignore the last dimension (assuming you only care about the first two)
filtered_logits = logits[:, 0:-1]
# Get the predicted labels using the filtered logits
_, predicted_labels = torch.max(filtered_logits, dim=-1)
# Convert to numpy array if needed
predicted_labels = predicted_labels.cpu().numpy()
return predicted_labels
# Function to read reviews from a text file
def load_reviews_from_file(file_path):
reviews = {}
try:
with open(file_path, 'r', encoding='utf-8') as file:
for i, line in enumerate(file):
line = line.strip()
if line: # Ensure the line is not empty
reviews[f"Review {i + 1}"] = line
except FileNotFoundError:
st.error(f"File not found: {file_path}")
return reviews
#Demo Section
st.title("SpaGAN Demo: IN PROGRESS")
st.write("This demo allows you to select from a list of sample reviews that contrain both real and fake reviews.")
st.write("Once selected, the identified geo-entities within the review will be color coded and displayed along with the review.")
st.write("The model will construct a pseudo-sentence for each entity within the review, contextualizing each geo-entities closest neighbors from our dataset.")
st.write("Finally, the entire review will be embedded and combined with the spatial embeddings and the model will determine whether the review is real or fake.")
# Define a color map and descriptions for different entity types
COLOR_MAP = {
'FAC': ('red', 'Facilities (e.g., buildings, airports)'),
'ORG': ('blue', 'Organizations (e.g., companies, institutions)'),
'LOC': ('purple', 'Locations (e.g., mountain ranges, water bodies)'),
'GPE': ('green', 'Geopolitical Entities (e.g., countries, cities)')
}
# Display the color key
st.write("**Color Key:**")
for label, (color, description) in COLOR_MAP.items():
st.markdown(f"- **{label}**: {color} - {description}", unsafe_allow_html=True)
review_file_path = "models/spabert/datasets/SampleReviews.txt"
example_reviews = load_reviews_from_file(review_file_path)
# Define labels
review_labels = {
"Review 1": "Real",
"Review 2": "Spam",
}
# Create options with labels for the dropdown
dropdown_options = [f"{key} ({review_labels.get(key, 'Unknown')})" for key in example_reviews.keys()]
# Dropdown for selecting an example review
user_selection = st.selectbox("Select an example review", options=dropdown_options)
# Extract the original review key from the selected option
selected_key = user_selection.split(" (")[0] # Remove the label part
selected_review = example_reviews[selected_key]
lower_case_review = selected_review.lower()
# Process the text when the button is clicked
if st.button("Process Review"):
if lower_case_review.strip():
bert_embedding = get_bert_embedding(lower_case_review)
spaBert_embedding, current_pseudo_sentences = processSpatialEntities(selected_review,nlp)
combined_embedding = torch.cat((bert_embedding,spaBert_embedding),dim=-1)
if(dev_mode == True):
st.write("Review Embedding Shape:", bert_embedding.shape)
st.write("Geo-Entities embedding shape: ", spaBert_embedding.shape)
st.write("Concatenated Embedding Shape:", combined_embedding.shape)
st.write("Concatenated Embedding:", combined_embedding)
prediction = get_prediction(combined_embedding)
# Process the text using spaCy
doc = nlp(selected_review)
# Highlight geo-entities with different colors
highlighted_text = selected_review
for ent in reversed(doc.ents):
if ent.label_ in COLOR_MAP:
color = COLOR_MAP[ent.label_][0]
highlighted_text = (
highlighted_text[:ent.start_char] +
f"{ent.text}" +
highlighted_text[ent.end_char:]
)
# Display the highlighted text with HTML support
st.markdown(highlighted_text, unsafe_allow_html=True)
#Display pseudo sentences found
for sentence in current_pseudo_sentences:
clean_sentence = sentence.replace("[PAD]", "").strip()
st.write("Pseudo-Sentence:", clean_sentence)
#Display the models prediction
if(prediction == 0):
st.write("Prediction: Not Spam")
elif(prediction == 1):
st.write("Prediction: Spam")
else:
st.write("error during prediction")
else:
st.error("Please select a review.")