# backend/app/database.py from fastapi import Request, Depends, HTTPException from fastapi.responses import JSONResponse from motor.motor_asyncio import AsyncIOMotorClient import datetime from typing import Optional, List from .models import User, FileUpload, Opportunity from bson import Binary, ObjectId import os import json # Get MongoDB connection string from environment variable MONGO_URI = os.getenv("MONGODB_URI", "mongodb://localhost:27017") DB_NAME = os.getenv("MONGODB_DB", "aithon") client = AsyncIOMotorClient(MONGO_URI) db = client[DB_NAME] # Collections users_collection = db.users files_collection = db.files opportunities_collection = db.opportunities async def get_user_by_username(username: str) -> Optional[User]: """ Retrieve a user by username """ user_doc = await users_collection.find_one({"username": username}) if user_doc: return User( username=user_doc["username"], email=user_doc["email"], password=user_doc["password"] ) return None async def get_user_by_email(email: str) -> Optional[User]: """ Retrieve a user by email """ user_doc = await users_collection.find_one({"email": email}) if user_doc: return User( username=user_doc["username"], email=user_doc["email"], password=user_doc["password"] ) return None async def create_user(user: User) -> bool: """ Create a new user Returns True if successful, False if user already exists """ try: # Check if username or email already exists if await get_user_by_username(user.username) or await get_user_by_email(user.email): return False user_doc = { "username": user.username, "email": user.email, "password": user.password, "created_at": datetime.datetime.now(datetime.UTC) } await users_collection.insert_one(user_doc) return True except Exception as e: print(f"Error creating user: {e}") return False async def save_file(username: str, records: any, filename: str) -> bool: """ Save a file to the database """ try: current_time = datetime.datetime.now(datetime.UTC) file_doc = { "username": username, "filename": filename, "content": records, "created_at": current_time, "updated_at": current_time, "file_type": filename.split('.')[-1] if '.' in filename else 'unknown' } # Update if exists, insert if not result = await files_collection.update_one( {"username": username, "filename": filename}, {"$set": { **file_doc, "updated_at": current_time }}, upsert=True ) for content in records: #assume csv is the same format for all files opportunity = Opportunity( opportunityId=content["Opportunity ID"], opportunityName=content["Opportunity Name"], opportunityState=content["Opportunity Stage"], opportunityValue=content["Opportunity Value"], customerName=content["Customer Name"], customerContact=content["Customer Contact"], customerContactRole=content["Customer Contact Role"], nextSteps=content["Next Steps"], opportunityDescription=content["Opportunity Description"], activity=content["Activity"], closeDate=parse_date_string(content.get("Close Date")), created_at=current_time, updated_at=current_time, username=username ) await create_opportunity(opportunity) return bool(result.modified_count or result.upserted_id) except Exception as e: print(f"Error saving file: {e}") return False async def get_user_files(username: str) -> List[FileUpload]: """ Retrieve all files belonging to a user """ try: cursor = files_collection.find({"username": username}) files = [] async for doc in cursor: files.append( FileUpload( filename=doc["filename"], content=doc["content"], created_at=doc["created_at"], updated_at=doc["updated_at"] ) ) return files except Exception as e: print(f"Error retrieving files: {e}") return [] async def delete_file(username: str, filename: str) -> bool: """ Delete a file from the database """ try: result = await files_collection.delete_one({ "username": username, "filename": filename }) return bool(result.deleted_count) except Exception as e: print(f"Error deleting file: {e}") return False async def get_file_by_name(username: str, filename: str) -> Optional[FileUpload]: """ Retrieve a specific file by username and filename """ try: doc = await files_collection.find_one({ "username": username, "filename": filename }) if doc: return FileUpload( filename=doc["filename"], content=doc["content"].decode() if isinstance(doc["content"], Binary) else str(doc["content"]), created_at=doc["created_at"], updated_at=doc["updated_at"] ) return None except Exception as e: print(f"Error retrieving file: {e}") return None async def update_user(username: str, update_data: dict) -> bool: """ Update user information """ try: result = await users_collection.update_one( {"username": username}, {"$set": { **update_data, "updated_at": datetime.utcnow() }} ) return bool(result.modified_count) except Exception as e: print(f"Error updating user: {e}") return False # Opportunities async def get_opportunities(username: str, skip: int = 0, limit: int = 100) -> List[Opportunity]: """ Retrieve opportunities belonging to a user with pagination """ cursor = opportunities_collection.find({"username": username}).skip(skip).limit(limit) opportunities = await cursor.to_list(length=None) return [Opportunity(**doc) for doc in opportunities] async def get_opportunity_count(username: str) -> int: """ Get the total number of opportunities for a user """ return await opportunities_collection.count_documents({"username": username}) async def create_opportunity(opportunity: Opportunity) -> bool: """ Create a new opportunity """ #opportunity.created_at = datetime.datetime.now(datetime.UTC) #opportunity.updated_at = datetime.datetime.now(datetime.UTC) print("opportunity********", opportunity) await opportunities_collection.insert_one(opportunity.model_dump()) return True # Index creation function - call this during application startup async def create_indexes(): """ Create necessary indexes for the collections """ try: # Users indexes await users_collection.create_index("username", unique=True) await users_collection.create_index("email", unique=True) # Files indexes await files_collection.create_index([("username", 1), ("filename", 1)], unique=True) await files_collection.create_index("created_at") await files_collection.create_index("updated_at") # Opportunities indexes await opportunities_collection.create_index("username") await opportunities_collection.create_index("created_at") await opportunities_collection.create_index("updated_at") return True except Exception as e: print(f"Error creating indexes: {e}") return False def parse_date_string(value: str) -> datetime: """ Parse different date string formats into datetime object """ if not value: return None # List of possible date formats date_formats = [ "%m/%d/%Y", # 11/30/2024 "%m/%d/%Y %H:%M:%S", # 11/30/2024 14:30:00 "%Y-%m-%d", # 2024-11-30 "%Y-%m-%dT%H:%M:%S", # 2024-11-30T14:30:00 "%Y-%m-%d %H:%M:%S", # 2024-11-30 14:30:00 "%Y-%m-%dT%H:%M:%S.%fZ", # 2024-11-30T14:30:00.000Z "%Y-%m-%dT%H:%M:%S.%f", # 2024-11-30T14:30:00.000 ] for date_format in date_formats: try: return datetime.datetime.strptime(value, date_format) except ValueError: continue raise ValueError(f"Date string '{value}' does not match any expected format") # Optional: Add these to your requirements.txt # motor==3.3.1 # pymongo==4.5.0