salesiq / backend /app /db /database_mongodb.py
richlai's picture
working on docker
902845d
raw
history blame
5.85 kB
# backend/app/database.py
from motor.motor_asyncio import AsyncIOMotorClient
import datetime
from typing import Optional, List
from .models import User, FileUpload
from bson import Binary
import os
# Get MongoDB connection string from environment variable
MONGO_URI = os.getenv("MONGODB_URI", "mongodb://localhost:27017")
DB_NAME = os.getenv("MONGODB_DB", "aithon")
client = AsyncIOMotorClient(MONGO_URI)
db = client[DB_NAME]
# Collections
users_collection = db.users
files_collection = db.files
async def get_user_by_username(username: str) -> Optional[User]:
"""
Retrieve a user by username
"""
user_doc = await users_collection.find_one({"username": username})
if user_doc:
return User(
username=user_doc["username"],
email=user_doc["email"],
password=user_doc["password"]
)
return None
async def get_user_by_email(email: str) -> Optional[User]:
"""
Retrieve a user by email
"""
user_doc = await users_collection.find_one({"email": email})
if user_doc:
return User(
username=user_doc["username"],
email=user_doc["email"],
password=user_doc["password"]
)
return None
async def create_user(user: User) -> bool:
"""
Create a new user
Returns True if successful, False if user already exists
"""
try:
# Check if username or email already exists
if await get_user_by_username(user.username) or await get_user_by_email(user.email):
return False
user_doc = {
"username": user.username,
"email": user.email,
"password": user.password,
"created_at": datetime.datetime.now(datetime.UTC)
}
await users_collection.insert_one(user_doc)
return True
except Exception as e:
print(f"Error creating user: {e}")
return False
async def save_file(username: str, records: any, filename: str) -> bool:
"""
Save a file to the database
"""
try:
current_time = datetime.datetime.now(datetime.UTC)
file_doc = {
"username": username,
"filename": filename,
"content": records,
"created_at": current_time,
"updated_at": current_time,
"file_type": filename.split('.')[-1] if '.' in filename else 'unknown'
}
# Update if exists, insert if not
result = await files_collection.update_one(
{"username": username, "filename": filename},
{"$set": {
**file_doc,
"updated_at": current_time
}},
upsert=True
)
return bool(result.modified_count or result.upserted_id)
except Exception as e:
print(f"Error saving file: {e}")
return False
async def get_user_files(username: str) -> List[FileUpload]:
"""
Retrieve all files belonging to a user
"""
try:
cursor = files_collection.find({"username": username})
files = []
async for doc in cursor:
files.append(
FileUpload(
filename=doc["filename"],
content=doc["content"],
created_at=doc["created_at"],
updated_at=doc["updated_at"]
)
)
return files
except Exception as e:
print(f"Error retrieving files: {e}")
return []
async def delete_file(username: str, filename: str) -> bool:
"""
Delete a file from the database
"""
try:
result = await files_collection.delete_one({
"username": username,
"filename": filename
})
return bool(result.deleted_count)
except Exception as e:
print(f"Error deleting file: {e}")
return False
async def get_file_by_name(username: str, filename: str) -> Optional[FileUpload]:
"""
Retrieve a specific file by username and filename
"""
try:
doc = await files_collection.find_one({
"username": username,
"filename": filename
})
if doc:
return FileUpload(
filename=doc["filename"],
content=doc["content"].decode() if isinstance(doc["content"], Binary) else str(doc["content"]),
created_at=doc["created_at"],
updated_at=doc["updated_at"]
)
return None
except Exception as e:
print(f"Error retrieving file: {e}")
return None
async def update_user(username: str, update_data: dict) -> bool:
"""
Update user information
"""
try:
result = await users_collection.update_one(
{"username": username},
{"$set": {
**update_data,
"updated_at": datetime.utcnow()
}}
)
return bool(result.modified_count)
except Exception as e:
print(f"Error updating user: {e}")
return False
# Index creation function - call this during application startup
async def create_indexes():
"""
Create necessary indexes for the collections
"""
try:
# Users indexes
await users_collection.create_index("username", unique=True)
await users_collection.create_index("email", unique=True)
# Files indexes
await files_collection.create_index([("username", 1), ("filename", 1)], unique=True)
await files_collection.create_index("created_at")
await files_collection.create_index("updated_at")
return True
except Exception as e:
print(f"Error creating indexes: {e}")
return False
# Optional: Add these to your requirements.txt
# motor==3.3.1
# pymongo==4.5.0