from fastapi import APIRouter, HTTPException,Depends,File, UploadFile from fastapi.responses import JSONResponse from config.database import admin_collection, user_collection,notification_collection,pothole_image_collection from model.pothole_model import load_image_model from utils.auth import create_access_token, hash_password, verify_password, verify_token from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from schema.model import Admin, PoholeInfo, PotInfoById, PotholeFilters, PotholeModel, UpdatePotholeInfo, User, UserLogin, VerifyOtp from utils.email_validator import send_activation_email import requests import uuid from bson import ObjectId import random security_scheme = HTTPBearer() router = APIRouter() activation_tokens={} import firebase_admin from firebase_admin import credentials, storage # Initialize Firebase Admin SDK cred = credentials.Certificate("pothole-detection-c31a0-firebase-adminsdk-xirxs-4ad5f554ca.json") firebase_admin.initialize_app(cred, { 'storageBucket': 'pothole-detection-c31a0.appspot.com' }) def generate_otp(): return str(random.randint(100000, 999999)) #----------------------------------------Admin Collections---------------------------------------- # Admin Registration Api @router.post("/api/admin/registerAdmin", tags=["admin"]) async def register_admin(admin: Admin): hashed_password = hash_password(admin.password) admin_dict = admin.dict() admin_dict['password'] = hashed_password # activation_otp = generate_otp() # admin_dict['otp'] = activation_otp result = admin_collection.insert_one(admin_dict) return {"message": "Admin created successfully", "user_id": str(result.inserted_id)} #Api to get all admins # @router.get("/api/admins/getAllAdmin", tags=["admin"]) # async def get_all_admins(): # admins = admin_collection.find({}) # admin_list = [] # for admin in admins: # admin["_id"] = str(admin["_id"]) # admin_list.append(admin) # return admin_list #-------------------------------------------User Collections------------------------------------------- # User Login API @router.post("/api/user/userLogin", tags=["user"]) async def user_login(userLogin: UserLogin): # Check if the provided username and password are for admin if userLogin.userName == "admin001" and userLogin.password == "Admin@123": collection = admin_collection else: collection = user_collection user = collection.find_one({"userName": userLogin.userName}) if not user or not verify_password(userLogin.password, user.get('password', '')): raise HTTPException(status_code=401, detail="Incorrect username or password") # Check if the user is verified if not user.get('isVerified'): raise HTTPException(status_code=403, detail="Account is not verified") # Extract additional user data user_data = { "email": user.get("email"), "username": user.get("userName"), "id": str(user.get("_id")), "role":user.get("role") } token = create_access_token({"sub": userLogin.userName}) response_data = {"access_token": token, "token_type": "bearer", "user_data": user_data} return response_data # User Registration Api @router.post("/api/user/registerUser", tags=["user"]) async def create_user(user: User): hashed_password = hash_password(user.password) user_dict = user.dict() user_dict['password'] = hashed_password # activation_otp = generate_otp() # user_dict['otp'] = activation_otp # Send activation email # send_activation_email(user.email, activation_otp) # result = user_collection.insert_one(user_dict) # return {"message": "User created successfully. Activation email sent.",} result = user_collection.insert_one(user_dict) return {"message": "User created successfully", "userId": str(result.inserted_id)} # Otp Verification Api @router.post("/api/user/verifyOtp", tags=["user"]) async def verify_otp(verifyOtp:VerifyOtp): # Check if the user exists with the given email user = user_collection.find_one({"email": verifyOtp.email}) if user is None: raise HTTPException(status_code=404, detail="User not found") # Check if the provided OTP matches the stored OTP if user.get('otp') != verifyOtp.otp: raise HTTPException(status_code=400, detail="Invalid OTP") # Update the user's isVerified field to True user_collection.update_one({"email": verifyOtp.email}, {"$set": {"isVerified": True}}) return {"message": "OTP verified successfully and user is verified"} # Api to show the registered users list @router.get("/api/user/getAllUsers", tags=["user"], dependencies=[Depends(security_scheme)]) async def get_all_user(token: HTTPAuthorizationCredentials = Depends(security_scheme)): verify_token(token.credentials) users = user_collection.find({}) user_list = [] for user in users: user["_id"] = str(user["_id"]) # Convert ObjectId to string user_list.append(user) return user_list #-------------------------------------------Pothole Collections------------------------------------------- # Api to submit the information about pothole @router.post("/api/information/submitInformation", tags=["pothole"], dependencies=[Depends(security_scheme)]) async def upload_file(potholeInfo:PoholeInfo,token: HTTPAuthorizationCredentials = Depends(security_scheme)): verify_token(token.credentials) pothole_info = potholeInfo.dict() notification_collection.insert_one(pothole_info) return {"message": "information submitted sucesssuccessfully"} # Api to update the information about pothole @router.put("/api/information/updateInformation", tags=["pothole"], dependencies=[Depends(security_scheme)]) async def update_pothole_information(update_data: UpdatePotholeInfo, token: HTTPAuthorizationCredentials = Depends(security_scheme)): verify_token(token.credentials) try: # Update the pothole information in the collection result = notification_collection.update_one({"_id": ObjectId(update_data.infoID)}, {"$set": {"status": update_data.status, "assignee": update_data.assignee}}) if result.modified_count == 1: return {"message": "Pothole information updated successfully"} else: return {"message": "No changes were made. Pothole information remains unchanged."} except Exception as e: return JSONResponse(content={"message": f"Error occurred: {str(e)}"}, status_code=500) # @router.get("/api/information/getAllFiles", tags=["pothole"], dependencies=[Depends(security_scheme)]) # async def get_all_user(token: HTTPAuthorizationCredentials = Depends(security_scheme)): # verify_token(token.credentials) # files = pothole_image_collection.find({}) # file_list = [] # for file in files: # file["_id"] = str(file["_id"]) # Convert ObjectId to string # file_list.append(file) # return file_list # Function to upload file to Firebase Storage import os async def upload_file_to_firebase(file: UploadFile, token: HTTPAuthorizationCredentials): verify_token(token.credentials) try: # Generate a unique ID for the file fileID = uuid.uuid4().hex # Extract the filename from the path provided by Flutter filename = os.path.basename(file.filename) # Get reference to Firebase Storage bucket bucket = storage.bucket() # Create a blob object with the filename blob = bucket.blob(fileID + "-" + filename) # Upload the file blob.upload_from_file(file.file) blob.make_public() # Get the public URL of the uploaded file url = blob.public_url # Insert the document into the collection with Firebase URL pothole_image_collection.insert_one({"fileID": fileID, "url": url, "filename": filename}) return JSONResponse(content={"message": "file uploaded successfully", "file_id": fileID}) except Exception as e: return JSONResponse(content={"message": f"Error occurred: {str(e)}"}, status_code=500) @router.post("/api/information/fileUpload", tags=["pothole"]) async def upload_file_api(file: UploadFile = File(...), token: HTTPAuthorizationCredentials = Depends(security_scheme)): return await upload_file_to_firebase(file, token) # Api to get all information about pothole @router.post("/api/information/getAllPotholeInformation", tags=["pothole"], dependencies=[Depends(security_scheme)]) async def get_all_info_with_filters(filters: PotholeFilters, token: HTTPAuthorizationCredentials = Depends(security_scheme)): verify_token(token.credentials) # Construct a filter query based on provided parameters filter_query = {} if filters.userID == "66142a506d8f2a0f116fd613": # If userID is the specific ID, do not filter by userID pass elif filters.userID: filter_query["userId"] = filters.userID # Always apply the status filter if filters.status: filter_query["status"] = filters.status # Apply the filter query to find relevant pothole information pothole_informations = notification_collection.find(filter_query) info_list = [] for info in pothole_informations: info["_id"] = str(info["_id"]) file_id = info.get('fileID') if file_id: image_data = pothole_image_collection.find_one({"fileID": file_id}) if image_data: info['image'] = image_data.get('url') else: print("No image data found for file ID:", file_id) # Debugging print info_list.append(info) return info_list # Api to get information about pothole by unique id @router.post("/api/information/getPotholeInformationById", tags=["pothole"], dependencies=[Depends(security_scheme)]) async def get_data_by_id(potHoleInfoById:PotInfoById,token: HTTPAuthorizationCredentials = Depends(security_scheme)): verify_token(token.credentials) try: object_id = ObjectId(potHoleInfoById.infoID) data = notification_collection.find_one({"_id": object_id}) if data: data["_id"] = str(data["_id"]) image_data = pothole_image_collection.find_one({"fileID": data["fileID"]}) if image_data: data["image"] = image_data["url"] # Append image to the response return data else: raise HTTPException(status_code=404, detail="Data not found") except: raise HTTPException(status_code=400, detail="Invalid ID format") # Api to to verify the given object has pothole or not @router.post("/api/information/verifyPothole", tags=["pothole"], dependencies=[Depends(security_scheme)]) async def verify_pothole(potholeModel: PotholeModel, token: HTTPAuthorizationCredentials = Depends(security_scheme)): verify_token(token.credentials) try: # Get image bytes from URL response = requests.get(potholeModel.image) image_bytes = response.content # Pass image bytes to your model function results = load_image_model(image_bytes) return JSONResponse(content={"response": results[2:]}) except Exception as e: return JSONResponse(content={"response": f"{e}"})