salesiq / backend /app /db /database_mongodb.py
richlai's picture
fix date issue for upload
c63a1e8
# backend/app/database.py
from fastapi import Request, Depends, HTTPException
from fastapi.responses import JSONResponse
from motor.motor_asyncio import AsyncIOMotorClient
import datetime
from typing import Optional, List
from .models import User, FileUpload, Opportunity
from bson import Binary, ObjectId
import os
import json
# Get MongoDB connection string from environment variable
MONGO_URI = os.getenv("MONGODB_URI", "mongodb://localhost:27017")
DB_NAME = os.getenv("MONGODB_DB", "aithon")
client = AsyncIOMotorClient(MONGO_URI)
db = client[DB_NAME]
# Collections
users_collection = db.users
files_collection = db.files
opportunities_collection = db.opportunities
async def get_user_by_username(username: str) -> Optional[User]:
"""
Retrieve a user by username
"""
user_doc = await users_collection.find_one({"username": username})
if user_doc:
return User(
username=user_doc["username"],
email=user_doc["email"],
password=user_doc["password"]
)
return None
async def get_user_by_email(email: str) -> Optional[User]:
"""
Retrieve a user by email
"""
user_doc = await users_collection.find_one({"email": email})
if user_doc:
return User(
username=user_doc["username"],
email=user_doc["email"],
password=user_doc["password"]
)
return None
async def create_user(user: User) -> bool:
"""
Create a new user
Returns True if successful, False if user already exists
"""
try:
# Check if username or email already exists
if await get_user_by_username(user.username) or await get_user_by_email(user.email):
return False
user_doc = {
"username": user.username,
"email": user.email,
"password": user.password,
"created_at": datetime.datetime.now(datetime.UTC)
}
await users_collection.insert_one(user_doc)
return True
except Exception as e:
print(f"Error creating user: {e}")
return False
async def save_file(username: str, records: any, filename: str) -> bool:
"""
Save a file to the database
"""
try:
current_time = datetime.datetime.now(datetime.UTC)
file_doc = {
"username": username,
"filename": filename,
"content": records,
"created_at": current_time,
"updated_at": current_time,
"file_type": filename.split('.')[-1] if '.' in filename else 'unknown'
}
# Update if exists, insert if not
result = await files_collection.update_one(
{"username": username, "filename": filename},
{"$set": {
**file_doc,
"updated_at": current_time
}},
upsert=True
)
for content in records: #assume csv is the same format for all files
opportunity = Opportunity(
opportunityId=content["Opportunity ID"],
opportunityName=content["Opportunity Name"],
opportunityState=content["Opportunity Stage"],
opportunityValue=content["Opportunity Value"],
customerName=content["Customer Name"],
customerContact=content["Customer Contact"],
customerContactRole=content["Customer Contact Role"],
nextSteps=content["Next Steps"],
opportunityDescription=content["Opportunity Description"],
activity=content["Activity"],
closeDate=parse_date_string(content.get("Close Date")),
created_at=current_time,
updated_at=current_time,
username=username
)
await create_opportunity(opportunity)
return bool(result.modified_count or result.upserted_id)
except Exception as e:
print(f"Error saving file: {e}")
return False
async def get_user_files(username: str) -> List[FileUpload]:
"""
Retrieve all files belonging to a user
"""
try:
cursor = files_collection.find({"username": username})
files = []
async for doc in cursor:
files.append(
FileUpload(
filename=doc["filename"],
content=doc["content"],
created_at=doc["created_at"],
updated_at=doc["updated_at"]
)
)
return files
except Exception as e:
print(f"Error retrieving files: {e}")
return []
async def delete_file(username: str, filename: str) -> bool:
"""
Delete a file from the database
"""
try:
result = await files_collection.delete_one({
"username": username,
"filename": filename
})
return bool(result.deleted_count)
except Exception as e:
print(f"Error deleting file: {e}")
return False
async def get_file_by_name(username: str, filename: str) -> Optional[FileUpload]:
"""
Retrieve a specific file by username and filename
"""
try:
doc = await files_collection.find_one({
"username": username,
"filename": filename
})
if doc:
return FileUpload(
filename=doc["filename"],
content=doc["content"].decode() if isinstance(doc["content"], Binary) else str(doc["content"]),
created_at=doc["created_at"],
updated_at=doc["updated_at"]
)
return None
except Exception as e:
print(f"Error retrieving file: {e}")
return None
async def update_user(username: str, update_data: dict) -> bool:
"""
Update user information
"""
try:
result = await users_collection.update_one(
{"username": username},
{"$set": {
**update_data,
"updated_at": datetime.utcnow()
}}
)
return bool(result.modified_count)
except Exception as e:
print(f"Error updating user: {e}")
return False
# Opportunities
async def get_opportunities(username: str, skip: int = 0, limit: int = 100) -> List[Opportunity]:
"""
Retrieve opportunities belonging to a user with pagination
"""
cursor = opportunities_collection.find({"username": username}).skip(skip).limit(limit)
opportunities = await cursor.to_list(length=None)
return [Opportunity(**doc) for doc in opportunities]
async def get_opportunity_count(username: str) -> int:
"""
Get the total number of opportunities for a user
"""
return await opportunities_collection.count_documents({"username": username})
async def create_opportunity(opportunity: Opportunity) -> bool:
"""
Create a new opportunity
"""
#opportunity.created_at = datetime.datetime.now(datetime.UTC)
#opportunity.updated_at = datetime.datetime.now(datetime.UTC)
print("opportunity********", opportunity)
await opportunities_collection.insert_one(opportunity.model_dump())
return True
# Index creation function - call this during application startup
async def create_indexes():
"""
Create necessary indexes for the collections
"""
try:
# Users indexes
await users_collection.create_index("username", unique=True)
await users_collection.create_index("email", unique=True)
# Files indexes
await files_collection.create_index([("username", 1), ("filename", 1)], unique=True)
await files_collection.create_index("created_at")
await files_collection.create_index("updated_at")
# Opportunities indexes
await opportunities_collection.create_index("username")
await opportunities_collection.create_index("created_at")
await opportunities_collection.create_index("updated_at")
return True
except Exception as e:
print(f"Error creating indexes: {e}")
return False
def parse_date_string(value: str) -> datetime:
"""
Parse different date string formats into datetime object
"""
if not value:
return None
# List of possible date formats
date_formats = [
"%m/%d/%Y", # 11/30/2024
"%m/%d/%Y %H:%M:%S", # 11/30/2024 14:30:00
"%Y-%m-%d", # 2024-11-30
"%Y-%m-%dT%H:%M:%S", # 2024-11-30T14:30:00
"%Y-%m-%d %H:%M:%S", # 2024-11-30 14:30:00
"%Y-%m-%dT%H:%M:%S.%fZ", # 2024-11-30T14:30:00.000Z
"%Y-%m-%dT%H:%M:%S.%f", # 2024-11-30T14:30:00.000
]
for date_format in date_formats:
try:
return datetime.datetime.strptime(value, date_format)
except ValueError:
continue
raise ValueError(f"Date string '{value}' does not match any expected format")
# Optional: Add these to your requirements.txt
# motor==3.3.1
# pymongo==4.5.0