import os import streamlit as st import torch import pandas as pd import time from tqdm import tqdm from transformers import AutoModelForSequenceClassification, AutoTokenizer # Set up Streamlit app st.title("An App to Score Firm-Generated Text on Eight Risk Factors") st.write("Note: You can either upload a CSV file or a single TXT file for scoring.") st.write("If uploading a CSV file, ensure that it contains the following columns: cik, fyear, Item 1A (or Text). Item 1A should contain the respective risk factors section for each firm-year observation.") st.write("If uploading a txt file, ensure it contains the respective risk factors section for each firm-year observation.") # Hugging Face model directories model_directories = { 'finance': 'mgmtprofessor/finance_risk_factors', 'accounting': 'mgmtprofessor/accounting_risk_factors', 'technology': 'mgmtprofessor/technology_risk_factors', 'international': 'mgmtprofessor/international_risk_factors', 'operations': 'mgmtprofessor/operations_risk_factors', 'marketing': 'mgmtprofessor/marketing_risk_factors', 'management': 'mgmtprofessor/management_risk_factors', 'legal': 'mgmtprofessor/legal_risk_factors' } # Check if CUDA is available use_cuda = torch.cuda.is_available() # Function to load a model from Hugging Face def load_model(category): try: model_name = model_directories.get(category) if model_name: tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForSequenceClassification.from_pretrained(model_name) return model, tokenizer else: st.error(f"No Hugging Face model found for {category}") return None, None except Exception as e: st.error(f"Failed to load model for {category}: {e}") return None, None # Function to score a document and return the probability for class '1' def score_document(model, tokenizer, text_data): if isinstance(text_data, str): text_data = [text_data] # Tokenize the input inputs = tokenizer(text_data, return_tensors="pt", padding=True, truncation=True) # Perform the prediction with torch.no_grad(): outputs = model(**inputs) # Get probabilities (softmax) probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1) # Get the probability associated with class '1' probability_class_1 = probabilities[:, 1].tolist() # Return as list return probability_class_1 # Function to find the relevant text column def get_text_column(df): possible_columns = ['Item 1A', 'Item 1A.', 'Item 1A. Risk Factors', 'text', 'Text'] for col in possible_columns: if col in df.columns: return col return None # Return None if no matching column is found # Dropdown to select file type file_type = st.selectbox("Select the file type to upload:", ["CSV", "TXT"]) # Track the start time start_time = time.time() # Handle CSV or TXT upload if file_type == "CSV": csv_file = st.file_uploader("Upload a CSV file with text data", type=["csv"]) if csv_file is not None: # Read the CSV file df = pd.read_csv(csv_file) # Find the relevant text column text_column = get_text_column(df) if text_column is None: st.error("No valid text column found. Please ensure your CSV contains 'Item 1A', 'Item 1A.', 'Item 1A. Risk Factors', 'Text', or 'text'.") else: # Extract text data from the identified column text_data = df[text_column].dropna().tolist() # Extracts all non-empty rows # Initialize an empty DataFrame for results result_df = df.copy() # Progress bar progress_bar = st.progress(0) total_categories = len(model_directories) for i, category in enumerate(tqdm(model_directories.keys(), desc="Scoring documents")): # Load the pre-trained model for the current category model, tokenizer = load_model(category) # Skip the category if model loading fails if model is not None: # Score the document for each row in the text data category_scores = [] for text in text_data: probability = score_document(model, tokenizer, text) category_scores.append(probability[0]) # Extract the first (and only) value # Add the results to the DataFrame result_df[category.capitalize()] = category_scores # Update the progress bar progress_bar.progress((i + 1) / total_categories) # Estimate remaining time elapsed_time = time.time() - start_time estimated_total_time = (elapsed_time / (i + 1)) * total_categories st.write(f"Elapsed time: {elapsed_time:.2f}s, Estimated time remaining: {estimated_total_time - elapsed_time:.2f}s") # Save results to CSV csv = result_df.to_csv(index=False).encode('utf-8') st.download_button( label="Download results as CSV", data=csv, file_name="document_scoring_results.csv", mime="text/csv", ) # Display completion message st.success("Document scoring complete!") elif file_type == "TXT": doc_file = st.file_uploader("Upload a TXT file", type=["txt"]) if doc_file is not None: # Read the content of the uploaded .txt file text_data = doc_file.read().decode("utf-8") # Initialize an empty DataFrame for results result_df = pd.DataFrame(columns=["Category", "Probability"]) # Progress bar progress_bar = st.progress(0) total_categories = len(model_directories) for i, category in enumerate(tqdm(model_directories.keys(), desc="Scoring documents")): # Load the pre-trained model for the current category model, tokenizer = load_model(category) # Skip the category if model loading fails if model is not None: # Score the document probability = score_document(model, tokenizer, text_data) # Create a DataFrame for the current result new_row = pd.DataFrame({ "Category": [category], "Probability": [probability[0]] # Extract the first value }) # Use pd.concat to append the new row to the DataFrame result_df = pd.concat([result_df, new_row], ignore_index=True) # Update the progress bar progress_bar.progress((i + 1) / total_categories) # Estimate remaining time elapsed_time = time.time() - start_time estimated_total_time = (elapsed_time / (i + 1)) * total_categories st.write(f"Elapsed time: {elapsed_time:.2f}s, Estimated time remaining: {estimated_total_time - elapsed_time:.2f}s") # Save results to CSV csv = result_df.to_csv(index=False).encode('utf-8') st.download_button( label="Download results as CSV", data=csv, file_name="document_scoring_results.csv", mime="text/csv", ) # Display completion message st.success("Document scoring complete!") st.write("Note: Ensure the uploaded document is formatted correctly. The models are limited to 512 tokens and will be upgraded in a future version.")