Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
import json | |
from transformers import ( | |
AutoTokenizer, | |
AutoModelForCausalLM, | |
AutoModelForSequenceClassification, | |
TrainingArguments, | |
Trainer | |
) | |
import torch | |
import numpy as np | |
from torch.utils.data import Dataset, DataLoader | |
import re | |
class FinancialDataset(Dataset): | |
def __init__(self, texts, labels, tokenizer, max_length=512): | |
self.texts = texts | |
self.labels = labels | |
self.tokenizer = tokenizer | |
self.max_length = max_length | |
def __len__(self): | |
return len(self.texts) | |
def __getitem__(self, idx): | |
text = str(self.texts[idx]) | |
inputs = self.tokenizer( | |
text, | |
truncation=True, | |
padding='max_length', | |
max_length=self.max_length, | |
return_tensors='pt' | |
) | |
return { | |
'input_ids': inputs['input_ids'].squeeze(), | |
'attention_mask': inputs['attention_mask'].squeeze(), | |
'labels': torch.tensor(self.labels[idx], dtype=torch.long) | |
} | |
class FinancialAnalyzer: | |
def __init__(self): | |
print("Initializing Analyzer...") | |
self.last_metrics = {} | |
self.initialize_models() | |
print("Initialization complete!") | |
def initialize_models(self): | |
"""Initialize both TinyLlama and FinBERT models""" | |
try: | |
# Initialize TinyLlama | |
self.llama_tokenizer = AutoTokenizer.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0") | |
self.llama_model = AutoModelForCausalLM.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0") | |
self.llama_model.eval() | |
# Initialize FinBERT | |
self.finbert_tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert") | |
self.finbert_model = AutoModelForSequenceClassification.from_pretrained("ProsusAI/finbert") | |
self.finbert_model.eval() | |
print("Models loaded successfully!") | |
except Exception as e: | |
print(f"Error initializing models: {str(e)}") | |
raise | |
def clean_number(self, value): | |
"""Clean and convert numerical values""" | |
try: | |
if isinstance(value, str): | |
value = value.replace('$', '').replace(',', '').strip() | |
if '(' in value and ')' in value: | |
value = '-' + value.replace('(', '').replace(')', '') | |
return float(value or 0) | |
except: | |
return 0.0 | |
def is_valid_markdown(self, file_path): | |
"""Check if a file is a valid Markdown file""" | |
try: | |
with open(file_path, 'r') as f: | |
content = f.read() | |
return any(line.startswith('#') or '|' in line for line in content.split('\n')) | |
except: | |
return False | |
def parse_financial_data(self, content): | |
"""Parse markdown content into structured data""" | |
try: | |
data = {} | |
current_section = "" | |
current_table = [] | |
headers = None | |
for line in content.split('\n'): | |
if line.startswith('#'): | |
if current_table and headers: | |
data[current_section] = self.process_table(headers, current_table) | |
current_section = line.strip('# ') | |
current_table = [] | |
headers = None | |
elif '|' in line: | |
if '-|-' not in line: | |
row = [cell.strip() for cell in line.split('|')[1:-1]] | |
if not headers: | |
headers = row | |
else: | |
current_table.append(row) | |
if current_table and headers: | |
data[current_section] = self.process_table(headers, current_table) | |
return data | |
except Exception as e: | |
print(f"Error parsing financial data: {str(e)}") | |
return {} | |
def process_table(self, headers, rows): | |
"""Process table data into structured format""" | |
try: | |
processed_data = {} | |
for row in rows: | |
if len(row) == len(headers): | |
item_name = row[0].strip('*').strip() | |
processed_data[item_name] = {} | |
for i, value in enumerate(row[1:], 1): | |
processed_data[item_name][headers[i]] = self.clean_number(value) | |
return processed_data | |
except Exception as e: | |
print(f"Error processing table: {str(e)}") | |
return {} | |
def get_nested_value(self, data, section, key, year): | |
"""Safely get nested dictionary value""" | |
try: | |
return data.get(section, {}).get(key, {}).get(str(year), 0) | |
except: | |
return 0 | |
def extract_metrics(self, income_data, balance_data): | |
"""Extract and calculate key financial metrics""" | |
try: | |
metrics = { | |
"Revenue": { | |
"2025": self.get_nested_value(income_data, "Revenue", "Total Net Revenue", "2025"), | |
"2024": self.get_nested_value(income_data, "Revenue", "Total Net Revenue", "2024"), | |
"2021": self.get_nested_value(income_data, "Revenue", "Total Net Revenue", "2021") | |
}, | |
"Profitability": { | |
"Gross_Profit_2025": self.get_nested_value(income_data, "Cost and Gross Profit", "Gross Profit", "2025"), | |
"EBIT_2025": self.get_nested_value(income_data, "Profit Summary", "EBIT", "2025"), | |
"Net_Earnings_2025": self.get_nested_value(income_data, "Profit Summary", "Net Earnings", "2025"), | |
"Operating_Expenses_2025": self.get_nested_value(income_data, "Operating Expenses", "Total Operating Expenses", "2025") | |
}, | |
"Balance_Sheet": { | |
"Total_Assets_2025": self.get_nested_value(balance_data, "Key Totals", "Total_Assets", "2025"), | |
"Current_Assets_2025": self.get_nested_value(balance_data, "Key Totals", "Total_Current_Assets", "2025"), | |
"Total_Liabilities_2025": self.get_nested_value(balance_data, "Key Totals", "Total_Liabilities", "2025"), | |
"Current_Liabilities_2025": self.get_nested_value(balance_data, "Key Totals", "Total_Current_Liabilities", "2025"), | |
"Equity_2025": self.get_nested_value(balance_data, "Key Totals", "Total_Shareholders_Equity", "2025"), | |
"Inventory_2025": self.get_nested_value(balance_data, "Balance Sheet Data 2021-2025", "Inventory", "2025"), | |
"Accounts_Receivable_2025": self.get_nested_value(balance_data, "Balance Sheet Data 2021-2025", "Accounts_Receivable", "2025"), | |
"Long_Term_Debt_2025": self.get_nested_value(balance_data, "Balance Sheet Data 2021-2025", "Long_Term_Debt", "2025") | |
}, | |
"Cash_Flow": { | |
"Depreciation_2025": self.get_nested_value(income_data, "Operating Expenses", "Depreciation & Amortization", "2025"), | |
"Interest_Expense_2025": self.get_nested_value(income_data, "Profit Summary", "Interest Expense", "2025") | |
} | |
} | |
revenue_2025 = metrics["Revenue"]["2025"] | |
if revenue_2025 != 0: | |
metrics["Ratios"] = { | |
"Gross_Margin": (metrics["Profitability"]["Gross_Profit_2025"] / revenue_2025) * 100, | |
"Operating_Margin": (metrics["Profitability"]["EBIT_2025"] / revenue_2025) * 100, | |
"Net_Margin": (metrics["Profitability"]["Net_Earnings_2025"] / revenue_2025) * 100, | |
"Current_Ratio": metrics["Balance_Sheet"]["Current_Assets_2025"] / metrics["Balance_Sheet"]["Current_Liabilities_2025"] if metrics["Balance_Sheet"]["Current_Liabilities_2025"] != 0 else 0, | |
"Quick_Ratio": (metrics["Balance_Sheet"]["Current_Assets_2025"] - metrics["Balance_Sheet"]["Inventory_2025"]) / metrics["Balance_Sheet"]["Current_Liabilities_2025"] if metrics["Balance_Sheet"]["Current_Liabilities_2025"] != 0 else 0, | |
"Asset_Turnover": revenue_2025 / metrics["Balance_Sheet"]["Total_Assets_2025"] if metrics["Balance_Sheet"]["Total_Assets_2025"] != 0 else 0, | |
"Receivables_Turnover": revenue_2025 / metrics["Balance_Sheet"]["Accounts_Receivable_2025"] if metrics["Balance_Sheet"]["Accounts_Receivable_2025"] != 0 else 0, | |
"Debt_to_Equity": metrics["Balance_Sheet"]["Total_Liabilities_2025"] / metrics["Balance_Sheet"]["Equity_2025"] if metrics["Balance_Sheet"]["Equity_2025"] != 0 else 0, | |
"Interest_Coverage": metrics["Profitability"]["EBIT_2025"] / metrics["Cash_Flow"]["Interest_Expense_2025"] if metrics["Cash_Flow"]["Interest_Expense_2025"] != 0 else 0, | |
"Revenue_Growth": ((metrics["Revenue"]["2025"] / metrics["Revenue"]["2024"]) - 1) * 100 if metrics["Revenue"]["2024"] != 0 else 0, | |
"5Year_Revenue_CAGR": ((metrics["Revenue"]["2025"] / metrics["Revenue"]["2021"]) ** (1/4) - 1) * 100 if metrics["Revenue"]["2021"] != 0 else 0 | |
} | |
return metrics | |
except Exception as e: | |
print(f"Error extracting metrics: {str(e)}") | |
return {} | |
def convert_to_serializable(obj): | |
"""Convert numpy values to Python native types""" | |
if isinstance(obj, np.float32): | |
return float(obj) | |
elif isinstance(obj, np.ndarray): | |
return obj.tolist() | |
elif isinstance(obj, dict): | |
return {key: convert_to_serializable(value) for key, value in obj.items()} | |
elif isinstance(obj, list): | |
return [convert_to_serializable(item) for item in obj] | |
return obj | |
def get_sentiment_analysis(self, metrics): | |
"""Get financial sentiment analysis using FinBERT""" | |
try: | |
financial_text = f""" | |
Revenue growth: {metrics['Ratios'].get('Revenue_Growth', 0):.2f}% | |
Profit margin: {metrics['Ratios'].get('Net_Margin', 0):.2f}% | |
Debt to equity: {metrics['Ratios'].get('Debt_to_Equity', 0):.2f} | |
Interest coverage: {metrics['Ratios'].get('Interest_Coverage', 0):.2f} | |
Current ratio: {metrics['Ratios'].get('Current_Ratio', 0):.2f} | |
""" | |
inputs = self.finbert_tokenizer(financial_text, return_tensors="pt", padding=True, truncation=True) | |
outputs = self.finbert_model(**inputs) | |
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1) | |
sentiment_scores = probabilities.detach().numpy()[0] | |
sentiments = ['negative', 'neutral', 'positive'] | |
sentiment_dict = dict(zip(sentiments, [float(score) for score in sentiment_scores])) | |
return sentiment_dict | |
except Exception as e: | |
print(f"Error in sentiment analysis: {str(e)}") | |
return {} | |
def analyze_financials(self, balance_sheet_file, income_stmt_file): | |
"""Main analysis function""" | |
try: | |
# Validate input files | |
if not (self.is_valid_markdown(balance_sheet_file) and self.is_valid_markdown(income_stmt_file)): | |
return "Error: One or both files are invalid or not in Markdown format." | |
# Read files | |
with open(balance_sheet_file, 'r') as f: | |
balance_sheet = f.read() | |
with open(income_stmt_file, 'r') as f: | |
income_stmt = f.read() | |
# Process financial data | |
income_data = self.parse_financial_data(income_stmt) | |
balance_data = self.parse_financial_data(balance_sheet) | |
metrics = self.extract_metrics(income_data, balance_data) | |
# Get sentiment analysis | |
sentiment_dict = self.get_sentiment_analysis(metrics) | |
# Generate and get analysis | |
prompt = self.generate_prompt(metrics, sentiment_dict) | |
analysis = self.generate_analysis(prompt) | |
# Convert all numpy values to Python native types | |
metrics = convert_to_serializable(metrics) | |
sentiment_dict = convert_to_serializable(sentiment_dict) | |
# Prepare final results | |
results = { | |
"Financial Analysis": { | |
"Key Metrics": metrics, | |
"Market Sentiment": sentiment_dict, | |
"AI Insights": analysis, | |
"Analysis Period": "2021-2025", | |
"Note": "All monetary values in millions ($M)" | |
} | |
} | |
return json.dumps(results, indent=2) | |
except Exception as e: | |
return f"Error in analysis: {str(e)}\n\nDetails: {type(e).__name__}" | |
def generate_prompt(self, metrics, sentiment_dict): | |
"""Create enhanced analysis prompt with sentiment""" | |
try: | |
return f"""[INST] As a financial analyst, provide a comprehensive analysis of this company's performance. | |
Financial Metrics (2025): | |
------------------------ | |
1. Revenue & Growth: | |
- Revenue: ${metrics['Revenue']['2025']:,.1f}M | |
- Growth Rate: {metrics['Ratios'].get('Revenue_Growth', 0):,.1f}% | |
- 5-Year CAGR: {metrics['Ratios'].get('5Year_Revenue_CAGR', 0):,.1f}% | |
2. Profitability: | |
- Gross Profit: ${metrics['Profitability']['Gross_Profit_2025']:,.1f}M | |
- EBIT: ${metrics['Profitability']['EBIT_2025']:,.1f}M | |
- Net Earnings: ${metrics['Profitability']['Net_Earnings_2025']:,.1f}M | |
- Margins: | |
* Gross: {metrics['Ratios'].get('Gross_Margin', 0):,.1f}% | |
* Operating: {metrics['Ratios'].get('Operating_Margin', 0):,.1f}% | |
* Net: {metrics['Ratios'].get('Net_Margin', 0):,.1f}% | |
3. Financial Position: | |
- Assets: ${metrics['Balance_Sheet']['Total_Assets_2025']:,.1f}M | |
- Liabilities: ${metrics['Balance_Sheet']['Total_Liabilities_2025']:,.1f}M | |
- Equity: ${metrics['Balance_Sheet']['Equity_2025']:,.1f}M | |
4. Key Ratios: | |
- Liquidity: Current Ratio {metrics['Ratios'].get('Current_Ratio', 0):,.2f}x | |
- Efficiency: Asset Turnover {metrics['Ratios'].get('Asset_Turnover', 0):,.2f}x | |
- Solvency: Debt/Equity {metrics['Ratios'].get('Debt_to_Equity', 0):,.2f}x | |
- Coverage: Interest Coverage {metrics['Ratios'].get('Interest_Coverage', 0):,.2f}x | |
Market Sentiment Indicators: | |
--------------------------- | |
- Positive: {sentiment_dict.get('positive', 0):,.2f} | |
- Neutral: {sentiment_dict.get('neutral', 0):,.2f} | |
- Negative: {sentiment_dict.get('negative', 0):,.2f} | |
Provide: | |
1. Overall financial health assessment | |
2. Key strengths and concerns | |
3. Operational efficiency analysis | |
4. Recommendations for improvement | |
[/INST]""" | |
except Exception as e: | |
print(f"Error generating prompt: {str(e)}") | |
return "" | |
def generate_analysis(self, prompt): | |
"""Generate analysis using TinyLlama""" | |
try: | |
# Format the prompt in TinyLlama's expected format | |
formatted_prompt = f"<human>: {prompt}\n<assistant>: Let me analyze these financial metrics in detail." | |
inputs = self.llama_tokenizer( | |
formatted_prompt, | |
return_tensors="pt", | |
truncation=True, | |
max_length=2048, | |
padding=True | |
) | |
# Generate with adjusted parameters | |
outputs = self.llama_model.generate( | |
inputs["input_ids"], | |
max_new_tokens=1024, | |
min_new_tokens=200, # Ensure minimum length | |
temperature=0.8, # Slightly increased creativity | |
top_p=0.92, # Slightly increased diversity | |
do_sample=True, | |
repetition_penalty=1.2, | |
length_penalty=1.5, # Encourage longer generations | |
num_return_sequences=1, | |
pad_token_id=self.llama_tokenizer.eos_token_id, | |
eos_token_id=self.llama_tokenizer.eos_token_id, | |
early_stopping=True | |
) | |
# Decode and clean up the response | |
analysis = self.llama_tokenizer.decode(outputs[0], skip_special_tokens=False) | |
# Extract only the assistant's response | |
if "<assistant>:" in analysis: | |
analysis = analysis.split("<assistant>:")[-1].strip() | |
# Clean up any remaining tags | |
analysis = analysis.replace("<human>:", "").replace("<assistant>:", "").strip() | |
# Validate output length and content | |
if len(analysis.split()) < 100: | |
# Fallback analysis if model generation is too short | |
analysis = self.generate_fallback_analysis(self.last_metrics) | |
return analysis | |
except Exception as e: | |
print(f"Detailed error in generate_analysis: {str(e)}") | |
return self.generate_fallback_analysis(self.last_metrics) | |
def generate_fallback_analysis(self, metrics): | |
"""Generate a basic analysis when the model fails""" | |
try: | |
revenue_growth = metrics['Ratios'].get('Revenue_Growth', 0) | |
net_margin = metrics['Ratios'].get('Net_Margin', 0) | |
current_ratio = metrics['Ratios'].get('Current_Ratio', 0) | |
debt_to_equity = metrics['Ratios'].get('Debt_to_Equity', 0) | |
analysis = f""" | |
Financial Analysis Summary: | |
1. Revenue and Growth: | |
The company shows a revenue growth of {revenue_growth:.1f}%, indicating { | |
'strong' if revenue_growth > 5 else 'moderate' if revenue_growth > 0 else 'weak'} growth performance. | |
2. Profitability: | |
With a net margin of {net_margin:.1f}%, the company demonstrates { | |
'strong' if net_margin > 10 else 'moderate' if net_margin > 5 else 'concerning'} profitability levels. | |
3. Liquidity Position: | |
The current ratio of {current_ratio:.2f}x suggests { | |
'very strong' if current_ratio > 2 else 'adequate' if current_ratio > 1 else 'concerning'} liquidity position. | |
4. Financial Leverage: | |
With a debt-to-equity ratio of {debt_to_equity:.2f}, the company maintains { | |
'conservative' if debt_to_equity < 0.5 else 'moderate' if debt_to_equity < 1 else 'aggressive'} leverage. | |
Key Recommendations: | |
1. {'Consider debt reduction' if debt_to_equity > 0.5 else 'Maintain current debt levels'} | |
2. {'Focus on improving profit margins' if net_margin < 5 else 'Maintain profit efficiency'} | |
3. {'Implement growth strategies' if revenue_growth < 2 else 'Sustain growth momentum'} | |
This analysis is based on key financial metrics and standard industry benchmarks. | |
""" | |
return analysis | |
except Exception as e: | |
return f"Error generating fallback analysis: {str(e)}" | |
def fine_tune_models(self, train_texts, train_labels, epochs=3): | |
"""Fine-tune the models with custom data""" | |
try: | |
# Prepare dataset | |
train_dataset = FinancialDataset(train_texts, train_labels, self.llama_tokenizer) | |
# Training arguments | |
training_args = TrainingArguments( | |
output_dir="./financial_model_tuned", | |
num_train_epochs=epochs, | |
per_device_train_batch_size=4, | |
logging_dir="./logs", | |
logging_steps=10, | |
save_steps=50, | |
eval_steps=50, | |
evaluation_strategy="steps", | |
learning_rate=2e-5, | |
weight_decay=0.01, | |
warmup_steps=500, | |
) | |
# Initialize trainer | |
trainer = Trainer( | |
model=self.llama_model, | |
args=training_args, | |
train_dataset=train_dataset, | |
) | |
# Fine-tune the model | |
trainer.train() | |
# Save the fine-tuned model | |
self.llama_model.save_pretrained("./financial_model_tuned") | |
self.llama_tokenizer.save_pretrained("./financial_model_tuned") | |
print("Fine-tuning completed successfully!") | |
except Exception as e: | |
print(f"Error in fine-tuning: {str(e)}") | |
def analyze_financials(self, balance_sheet_file, income_stmt_file): | |
"""Main analysis function""" | |
try: | |
# Validate input files | |
if not (self.is_valid_markdown(balance_sheet_file) and self.is_valid_markdown(income_stmt_file)): | |
return "Error: One or both files are invalid or not in Markdown format." | |
# Read files | |
with open(balance_sheet_file, 'r') as f: | |
balance_sheet = f.read() | |
with open(income_stmt_file, 'r') as f: | |
income_stmt = f.read() | |
# Process financial data | |
income_data = self.parse_financial_data(income_stmt) | |
balance_data = self.parse_financial_data(balance_sheet) | |
metrics = self.extract_metrics(income_data, balance_data) | |
self.last_metrics = metrics | |
# Get sentiment analysis | |
sentiment_dict = self.get_sentiment_analysis(metrics) | |
# Generate and get analysis | |
prompt = self.generate_prompt(metrics, sentiment_dict) | |
analysis = self.generate_analysis(prompt) | |
# Prepare final results | |
results = { | |
"Financial Analysis": { | |
"Key Metrics": metrics, | |
"Market Sentiment": sentiment_dict, | |
"AI Insights": analysis, | |
"Analysis Period": "2021-2025", | |
"Note": "All monetary values in millions ($M)" | |
} | |
} | |
return json.dumps(results, indent=2) | |
except Exception as e: | |
return f"Error in analysis: {str(e)}\n\nDetails: {type(e).__name__}" | |
def create_interface(): | |
analyzer = FinancialAnalyzer() | |
iface = gr.Interface( | |
fn=analyzer.analyze_financials, | |
inputs=[ | |
gr.File(label="Balance Sheet (Markdown)", type="filepath"), | |
gr.File(label="Income Statement (Markdown)", type="filepath") | |
], | |
outputs=gr.Textbox(label="Analysis Results", lines=25), | |
title="AI Financial Statement Analyzer", | |
description="""Upload financial statements in Markdown format for AI-powered analysis. | |
The analysis combines LLM-based insights with sentiment analysis.""" | |
) | |
return iface | |
if __name__ == "__main__": | |
iface = create_interface() | |
iface.launch() |