mgmtprofessor's picture
Update app.py
fa4a747 verified
raw
history blame
7.86 kB
import os
import streamlit as st
import torch
import pandas as pd
import time
from tqdm import tqdm
from transformers import AutoModelForSequenceClassification, AutoTokenizer
# Set up Streamlit app
st.title("An App to Score Firm-Generated Text on Eight Risk Factors")
st.write("Note: You can either upload a CSV file or a single TXT file for scoring.")
st.write("If uploading a CSV file, ensure that it contains the following columns: cik, fyear, Item 1A (or Text). Item 1A should contain the respective risk factors section for each firm-year observation.")
st.write("If uploading a txt file, ensure it contains the respective risk factors section for each firm-year observation.")
# Hugging Face model directories
model_directories = {
'finance': 'mgmtprofessor/finance_risk_factors',
'accounting': 'mgmtprofessor/accounting_risk_factors',
'technology': 'mgmtprofessor/technology_risk_factors',
'international': 'mgmtprofessor/international_risk_factors',
'operations': 'mgmtprofessor/operations_risk_factors',
'marketing': 'mgmtprofessor/marketing_risk_factors',
'management': 'mgmtprofessor/management_risk_factors',
'legal': 'mgmtprofessor/legal_risk_factors'
}
# Check if CUDA is available
use_cuda = torch.cuda.is_available()
# Function to load a model from Hugging Face
def load_model(category):
try:
model_name = model_directories.get(category)
if model_name:
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
return model, tokenizer
else:
st.error(f"No Hugging Face model found for {category}")
return None, None
except Exception as e:
st.error(f"Failed to load model for {category}: {e}")
return None, None
# Function to score a document and return the probability for class '1'
def score_document(model, tokenizer, text_data):
if isinstance(text_data, str):
text_data = [text_data]
# Tokenize the input
inputs = tokenizer(text_data, return_tensors="pt", padding=True, truncation=True)
# Perform the prediction
with torch.no_grad():
outputs = model(**inputs)
# Get probabilities (softmax)
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)
# Get the probability associated with class '1'
probability_class_1 = probabilities[:, 1].tolist() # Return as list
return probability_class_1
# Function to find the relevant text column
def get_text_column(df):
possible_columns = ['Item 1A', 'Item 1A.', 'Item 1A. Risk Factors', 'text', 'Text']
for col in possible_columns:
if col in df.columns:
return col
return None # Return None if no matching column is found
# Dropdown to select file type
file_type = st.selectbox("Select the file type to upload:", ["CSV", "TXT"])
# Track the start time
start_time = time.time()
# Handle CSV or TXT upload
if file_type == "CSV":
csv_file = st.file_uploader("Upload a CSV file with text data", type=["csv"])
if csv_file is not None:
# Read the CSV file
df = pd.read_csv(csv_file)
# Find the relevant text column
text_column = get_text_column(df)
if text_column is None:
st.error("No valid text column found. Please ensure your CSV contains 'Item 1A', 'Item 1A.', 'Item 1A. Risk Factors', 'Text', or 'text'.")
else:
# Extract text data from the identified column
text_data = df[text_column].dropna().tolist() # Extracts all non-empty rows
# Initialize an empty DataFrame for results
result_df = df.copy()
# Progress bar
progress_bar = st.progress(0)
total_categories = len(model_directories)
for i, category in enumerate(tqdm(model_directories.keys(), desc="Scoring documents")):
# Load the pre-trained model for the current category
model, tokenizer = load_model(category)
# Skip the category if model loading fails
if model is not None:
# Score the document for each row in the text data
category_scores = []
for text in text_data:
probability = score_document(model, tokenizer, text)
category_scores.append(probability[0]) # Extract the first (and only) value
# Add the results to the DataFrame
result_df[category.capitalize()] = category_scores
# Update the progress bar
progress_bar.progress((i + 1) / total_categories)
# Estimate remaining time
elapsed_time = time.time() - start_time
estimated_total_time = (elapsed_time / (i + 1)) * total_categories
st.write(f"Elapsed time: {elapsed_time:.2f}s, Estimated time remaining: {estimated_total_time - elapsed_time:.2f}s")
# Save results to CSV
csv = result_df.to_csv(index=False).encode('utf-8')
st.download_button(
label="Download results as CSV",
data=csv,
file_name="document_scoring_results.csv",
mime="text/csv",
)
# Display completion message
st.success("Document scoring complete!")
elif file_type == "TXT":
doc_file = st.file_uploader("Upload a TXT file", type=["txt"])
if doc_file is not None:
# Read the content of the uploaded .txt file
text_data = doc_file.read().decode("utf-8")
# Initialize an empty DataFrame for results
result_df = pd.DataFrame(columns=["Category", "Probability"])
# Progress bar
progress_bar = st.progress(0)
total_categories = len(model_directories)
for i, category in enumerate(tqdm(model_directories.keys(), desc="Scoring documents")):
# Load the pre-trained model for the current category
model, tokenizer = load_model(category)
# Skip the category if model loading fails
if model is not None:
# Score the document
probability = score_document(model, tokenizer, text_data)
# Create a DataFrame for the current result
new_row = pd.DataFrame({
"Category": [category],
"Probability": [probability[0]] # Extract the first value
})
# Use pd.concat to append the new row to the DataFrame
result_df = pd.concat([result_df, new_row], ignore_index=True)
# Update the progress bar
progress_bar.progress((i + 1) / total_categories)
# Estimate remaining time
elapsed_time = time.time() - start_time
estimated_total_time = (elapsed_time / (i + 1)) * total_categories
st.write(f"Elapsed time: {elapsed_time:.2f}s, Estimated time remaining: {estimated_total_time - elapsed_time:.2f}s")
# Save results to CSV
csv = result_df.to_csv(index=False).encode('utf-8')
st.download_button(
label="Download results as CSV",
data=csv,
file_name="document_scoring_results.csv",
mime="text/csv",
)
# Display completion message
st.success("Document scoring complete!")
st.write("Note: Ensure the uploaded document is formatted correctly. The models are limited to 512 tokens and will be upgraded in a future version.")