mgmtprofessor's picture
Update app.py
fa4a747 verified
import os
import streamlit as st
import torch
import pandas as pd
import time
from tqdm import tqdm
from transformers import AutoModelForSequenceClassification, AutoTokenizer
# Set up Streamlit app
st.title("An App to Score Firm-Generated Text on Eight Risk Factors")
st.write("Note: You can either upload a CSV file or a single TXT file for scoring.")
st.write("If uploading a CSV file, ensure that it contains the following columns: cik, fyear, Item 1A (or Text). Item 1A should contain the respective risk factors section for each firm-year observation.")
st.write("If uploading a txt file, ensure it contains the respective risk factors section for each firm-year observation.")
# Hugging Face model directories
model_directories = {
'finance': 'mgmtprofessor/finance_risk_factors',
'accounting': 'mgmtprofessor/accounting_risk_factors',
'technology': 'mgmtprofessor/technology_risk_factors',
'international': 'mgmtprofessor/international_risk_factors',
'operations': 'mgmtprofessor/operations_risk_factors',
'marketing': 'mgmtprofessor/marketing_risk_factors',
'management': 'mgmtprofessor/management_risk_factors',
'legal': 'mgmtprofessor/legal_risk_factors'
}
# Check if CUDA is available
use_cuda = torch.cuda.is_available()
# Function to load a model from Hugging Face
def load_model(category):
try:
model_name = model_directories.get(category)
if model_name:
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
return model, tokenizer
else:
st.error(f"No Hugging Face model found for {category}")
return None, None
except Exception as e:
st.error(f"Failed to load model for {category}: {e}")
return None, None
# Function to score a document and return the probability for class '1'
def score_document(model, tokenizer, text_data):
if isinstance(text_data, str):
text_data = [text_data]
# Tokenize the input
inputs = tokenizer(text_data, return_tensors="pt", padding=True, truncation=True)
# Perform the prediction
with torch.no_grad():
outputs = model(**inputs)
# Get probabilities (softmax)
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)
# Get the probability associated with class '1'
probability_class_1 = probabilities[:, 1].tolist() # Return as list
return probability_class_1
# Function to find the relevant text column
def get_text_column(df):
possible_columns = ['Item 1A', 'Item 1A.', 'Item 1A. Risk Factors', 'text', 'Text']
for col in possible_columns:
if col in df.columns:
return col
return None # Return None if no matching column is found
# Dropdown to select file type
file_type = st.selectbox("Select the file type to upload:", ["CSV", "TXT"])
# Track the start time
start_time = time.time()
# Handle CSV or TXT upload
if file_type == "CSV":
csv_file = st.file_uploader("Upload a CSV file with text data", type=["csv"])
if csv_file is not None:
# Read the CSV file
df = pd.read_csv(csv_file)
# Find the relevant text column
text_column = get_text_column(df)
if text_column is None:
st.error("No valid text column found. Please ensure your CSV contains 'Item 1A', 'Item 1A.', 'Item 1A. Risk Factors', 'Text', or 'text'.")
else:
# Extract text data from the identified column
text_data = df[text_column].dropna().tolist() # Extracts all non-empty rows
# Initialize an empty DataFrame for results
result_df = df.copy()
# Progress bar
progress_bar = st.progress(0)
total_categories = len(model_directories)
for i, category in enumerate(tqdm(model_directories.keys(), desc="Scoring documents")):
# Load the pre-trained model for the current category
model, tokenizer = load_model(category)
# Skip the category if model loading fails
if model is not None:
# Score the document for each row in the text data
category_scores = []
for text in text_data:
probability = score_document(model, tokenizer, text)
category_scores.append(probability[0]) # Extract the first (and only) value
# Add the results to the DataFrame
result_df[category.capitalize()] = category_scores
# Update the progress bar
progress_bar.progress((i + 1) / total_categories)
# Estimate remaining time
elapsed_time = time.time() - start_time
estimated_total_time = (elapsed_time / (i + 1)) * total_categories
st.write(f"Elapsed time: {elapsed_time:.2f}s, Estimated time remaining: {estimated_total_time - elapsed_time:.2f}s")
# Save results to CSV
csv = result_df.to_csv(index=False).encode('utf-8')
st.download_button(
label="Download results as CSV",
data=csv,
file_name="document_scoring_results.csv",
mime="text/csv",
)
# Display completion message
st.success("Document scoring complete!")
elif file_type == "TXT":
doc_file = st.file_uploader("Upload a TXT file", type=["txt"])
if doc_file is not None:
# Read the content of the uploaded .txt file
text_data = doc_file.read().decode("utf-8")
# Initialize an empty DataFrame for results
result_df = pd.DataFrame(columns=["Category", "Probability"])
# Progress bar
progress_bar = st.progress(0)
total_categories = len(model_directories)
for i, category in enumerate(tqdm(model_directories.keys(), desc="Scoring documents")):
# Load the pre-trained model for the current category
model, tokenizer = load_model(category)
# Skip the category if model loading fails
if model is not None:
# Score the document
probability = score_document(model, tokenizer, text_data)
# Create a DataFrame for the current result
new_row = pd.DataFrame({
"Category": [category],
"Probability": [probability[0]] # Extract the first value
})
# Use pd.concat to append the new row to the DataFrame
result_df = pd.concat([result_df, new_row], ignore_index=True)
# Update the progress bar
progress_bar.progress((i + 1) / total_categories)
# Estimate remaining time
elapsed_time = time.time() - start_time
estimated_total_time = (elapsed_time / (i + 1)) * total_categories
st.write(f"Elapsed time: {elapsed_time:.2f}s, Estimated time remaining: {estimated_total_time - elapsed_time:.2f}s")
# Save results to CSV
csv = result_df.to_csv(index=False).encode('utf-8')
st.download_button(
label="Download results as CSV",
data=csv,
file_name="document_scoring_results.csv",
mime="text/csv",
)
# Display completion message
st.success("Document scoring complete!")
st.write("Note: Ensure the uploaded document is formatted correctly. The models are limited to 512 tokens and will be upgraded in a future version.")