|
import os |
|
import streamlit as st |
|
import torch |
|
import pandas as pd |
|
import time |
|
from tqdm import tqdm |
|
from transformers import AutoModelForSequenceClassification, AutoTokenizer |
|
|
|
|
|
st.title("An App to Score Firm-Generated Text on Eight Risk Factors") |
|
st.write("Note: You can either upload a CSV file or a single TXT file for scoring.") |
|
st.write("If uploading a CSV file, ensure that it contains the following columns: cik, fyear, Item 1A (or Text). Item 1A should contain the respective risk factors section for each firm-year observation.") |
|
st.write("If uploading a txt file, ensure it contains the respective risk factors section for each firm-year observation.") |
|
|
|
model_directories = { |
|
'finance': 'mgmtprofessor/finance_risk_factors', |
|
'accounting': 'mgmtprofessor/accounting_risk_factors', |
|
'technology': 'mgmtprofessor/technology_risk_factors', |
|
'international': 'mgmtprofessor/international_risk_factors', |
|
'operations': 'mgmtprofessor/operations_risk_factors', |
|
'marketing': 'mgmtprofessor/marketing_risk_factors', |
|
'management': 'mgmtprofessor/management_risk_factors', |
|
'legal': 'mgmtprofessor/legal_risk_factors' |
|
} |
|
|
|
|
|
use_cuda = torch.cuda.is_available() |
|
|
|
|
|
def load_model(category): |
|
try: |
|
model_name = model_directories.get(category) |
|
if model_name: |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = AutoModelForSequenceClassification.from_pretrained(model_name) |
|
return model, tokenizer |
|
else: |
|
st.error(f"No Hugging Face model found for {category}") |
|
return None, None |
|
except Exception as e: |
|
st.error(f"Failed to load model for {category}: {e}") |
|
return None, None |
|
|
|
|
|
def score_document(model, tokenizer, text_data): |
|
if isinstance(text_data, str): |
|
text_data = [text_data] |
|
|
|
|
|
inputs = tokenizer(text_data, return_tensors="pt", padding=True, truncation=True) |
|
|
|
|
|
with torch.no_grad(): |
|
outputs = model(**inputs) |
|
|
|
|
|
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1) |
|
|
|
|
|
probability_class_1 = probabilities[:, 1].tolist() |
|
|
|
return probability_class_1 |
|
|
|
|
|
def get_text_column(df): |
|
possible_columns = ['Item 1A', 'Item 1A.', 'Item 1A. Risk Factors', 'text', 'Text'] |
|
for col in possible_columns: |
|
if col in df.columns: |
|
return col |
|
return None |
|
|
|
|
|
file_type = st.selectbox("Select the file type to upload:", ["CSV", "TXT"]) |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
if file_type == "CSV": |
|
csv_file = st.file_uploader("Upload a CSV file with text data", type=["csv"]) |
|
|
|
if csv_file is not None: |
|
|
|
df = pd.read_csv(csv_file) |
|
|
|
|
|
text_column = get_text_column(df) |
|
|
|
if text_column is None: |
|
st.error("No valid text column found. Please ensure your CSV contains 'Item 1A', 'Item 1A.', 'Item 1A. Risk Factors', 'Text', or 'text'.") |
|
else: |
|
|
|
text_data = df[text_column].dropna().tolist() |
|
|
|
|
|
result_df = df.copy() |
|
|
|
|
|
progress_bar = st.progress(0) |
|
total_categories = len(model_directories) |
|
|
|
for i, category in enumerate(tqdm(model_directories.keys(), desc="Scoring documents")): |
|
|
|
model, tokenizer = load_model(category) |
|
|
|
|
|
if model is not None: |
|
|
|
category_scores = [] |
|
for text in text_data: |
|
probability = score_document(model, tokenizer, text) |
|
category_scores.append(probability[0]) |
|
|
|
|
|
result_df[category.capitalize()] = category_scores |
|
|
|
|
|
progress_bar.progress((i + 1) / total_categories) |
|
|
|
|
|
elapsed_time = time.time() - start_time |
|
estimated_total_time = (elapsed_time / (i + 1)) * total_categories |
|
st.write(f"Elapsed time: {elapsed_time:.2f}s, Estimated time remaining: {estimated_total_time - elapsed_time:.2f}s") |
|
|
|
|
|
csv = result_df.to_csv(index=False).encode('utf-8') |
|
st.download_button( |
|
label="Download results as CSV", |
|
data=csv, |
|
file_name="document_scoring_results.csv", |
|
mime="text/csv", |
|
) |
|
|
|
|
|
st.success("Document scoring complete!") |
|
|
|
elif file_type == "TXT": |
|
doc_file = st.file_uploader("Upload a TXT file", type=["txt"]) |
|
|
|
if doc_file is not None: |
|
|
|
text_data = doc_file.read().decode("utf-8") |
|
|
|
|
|
result_df = pd.DataFrame(columns=["Category", "Probability"]) |
|
|
|
|
|
progress_bar = st.progress(0) |
|
total_categories = len(model_directories) |
|
|
|
for i, category in enumerate(tqdm(model_directories.keys(), desc="Scoring documents")): |
|
|
|
model, tokenizer = load_model(category) |
|
|
|
|
|
if model is not None: |
|
|
|
probability = score_document(model, tokenizer, text_data) |
|
|
|
|
|
new_row = pd.DataFrame({ |
|
"Category": [category], |
|
"Probability": [probability[0]] |
|
}) |
|
|
|
|
|
result_df = pd.concat([result_df, new_row], ignore_index=True) |
|
|
|
|
|
progress_bar.progress((i + 1) / total_categories) |
|
|
|
|
|
elapsed_time = time.time() - start_time |
|
estimated_total_time = (elapsed_time / (i + 1)) * total_categories |
|
st.write(f"Elapsed time: {elapsed_time:.2f}s, Estimated time remaining: {estimated_total_time - elapsed_time:.2f}s") |
|
|
|
|
|
csv = result_df.to_csv(index=False).encode('utf-8') |
|
st.download_button( |
|
label="Download results as CSV", |
|
data=csv, |
|
file_name="document_scoring_results.csv", |
|
mime="text/csv", |
|
) |
|
|
|
|
|
st.success("Document scoring complete!") |
|
|
|
st.write("Note: Ensure the uploaded document is formatted correctly. The models are limited to 512 tokens and will be upgraded in a future version.") |
|
|