Spaces:
Sleeping
Sleeping
from fastapi import APIRouter, HTTPException,Depends,File, UploadFile | |
from fastapi.responses import JSONResponse | |
from config.database import admin_collection, user_collection,notification_collection,pothole_image_collection | |
from model.pothole_model import load_image_model | |
from utils.auth import create_access_token, hash_password, verify_password, verify_token | |
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
from schema.model import Admin, PoholeInfo, PotInfoById, PotholeFilters, PotholeModel, UpdatePotholeInfo, User, UserLogin, VerifyOtp | |
from utils.email_validator import send_activation_email | |
import requests | |
import uuid | |
from bson import ObjectId | |
import random | |
security_scheme = HTTPBearer() | |
router = APIRouter() | |
activation_tokens={} | |
import firebase_admin | |
from firebase_admin import credentials, storage | |
# Initialize Firebase Admin SDK | |
cred = credentials.Certificate("pothole-detection-c31a0-firebase-adminsdk-xirxs-4ad5f554ca.json") | |
firebase_admin.initialize_app(cred, { | |
'storageBucket': 'pothole-detection-c31a0.appspot.com' | |
}) | |
def generate_otp(): | |
return str(random.randint(100000, 999999)) | |
#----------------------------------------Admin Collections---------------------------------------- | |
# Admin Registration Api | |
async def register_admin(admin: Admin): | |
hashed_password = hash_password(admin.password) | |
admin_dict = admin.dict() | |
admin_dict['password'] = hashed_password | |
# activation_otp = generate_otp() | |
# admin_dict['otp'] = activation_otp | |
result = admin_collection.insert_one(admin_dict) | |
return {"message": "Admin created successfully", "user_id": str(result.inserted_id)} | |
#Api to get all admins | |
# @router.get("/api/admins/getAllAdmin", tags=["admin"]) | |
# async def get_all_admins(): | |
# admins = admin_collection.find({}) | |
# admin_list = [] | |
# for admin in admins: | |
# admin["_id"] = str(admin["_id"]) | |
# admin_list.append(admin) | |
# return admin_list | |
#-------------------------------------------User Collections------------------------------------------- | |
# User Login API | |
async def user_login(userLogin: UserLogin): | |
# Check if the provided username and password are for admin | |
if userLogin.userName == "admin001" and userLogin.password == "Admin@123": | |
collection = admin_collection | |
else: | |
collection = user_collection | |
user = collection.find_one({"userName": userLogin.userName}) | |
if not user or not verify_password(userLogin.password, user.get('password', '')): | |
raise HTTPException(status_code=401, detail="Incorrect username or password") | |
# Check if the user is verified | |
if not user.get('isVerified'): | |
raise HTTPException(status_code=403, detail="Account is not verified") | |
# Extract additional user data | |
user_data = { | |
"email": user.get("email"), | |
"username": user.get("userName"), | |
"id": str(user.get("_id")), | |
"role":user.get("role") | |
} | |
token = create_access_token({"sub": userLogin.userName}) | |
response_data = {"access_token": token, "token_type": "bearer", "user_data": user_data} | |
return response_data | |
# User Registration Api | |
async def create_user(user: User): | |
hashed_password = hash_password(user.password) | |
user_dict = user.dict() | |
user_dict['password'] = hashed_password | |
# activation_otp = generate_otp() | |
# user_dict['otp'] = activation_otp | |
# Send activation email | |
# send_activation_email(user.email, activation_otp) | |
# result = user_collection.insert_one(user_dict) | |
# return {"message": "User created successfully. Activation email sent.",} | |
result = user_collection.insert_one(user_dict) | |
return {"message": "User created successfully", "userId": str(result.inserted_id)} | |
# Otp Verification Api | |
async def verify_otp(verifyOtp:VerifyOtp): | |
# Check if the user exists with the given email | |
user = user_collection.find_one({"email": verifyOtp.email}) | |
if user is None: | |
raise HTTPException(status_code=404, detail="User not found") | |
# Check if the provided OTP matches the stored OTP | |
if user.get('otp') != verifyOtp.otp: | |
raise HTTPException(status_code=400, detail="Invalid OTP") | |
# Update the user's isVerified field to True | |
user_collection.update_one({"email": verifyOtp.email}, {"$set": {"isVerified": True}}) | |
return {"message": "OTP verified successfully and user is verified"} | |
# Api to show the registered users list | |
async def get_all_user(token: HTTPAuthorizationCredentials = Depends(security_scheme)): | |
verify_token(token.credentials) | |
users = user_collection.find({}) | |
user_list = [] | |
for user in users: | |
user["_id"] = str(user["_id"]) # Convert ObjectId to string | |
user_list.append(user) | |
return user_list | |
#-------------------------------------------Pothole Collections------------------------------------------- | |
# Api to submit the information about pothole | |
async def upload_file(potholeInfo:PoholeInfo,token: HTTPAuthorizationCredentials = Depends(security_scheme)): | |
verify_token(token.credentials) | |
pothole_info = potholeInfo.dict() | |
notification_collection.insert_one(pothole_info) | |
return {"message": "information submitted sucesssuccessfully"} | |
# Api to update the information about pothole | |
async def update_pothole_information(update_data: UpdatePotholeInfo, token: HTTPAuthorizationCredentials = Depends(security_scheme)): | |
verify_token(token.credentials) | |
try: | |
# Update the pothole information in the collection | |
result = notification_collection.update_one({"_id": ObjectId(update_data.infoID)}, {"$set": {"status": update_data.status, "assignee": update_data.assignee}}) | |
if result.modified_count == 1: | |
return {"message": "Pothole information updated successfully"} | |
else: | |
return {"message": "No changes were made. Pothole information remains unchanged."} | |
except Exception as e: | |
return JSONResponse(content={"message": f"Error occurred: {str(e)}"}, status_code=500) | |
# @router.get("/api/information/getAllFiles", tags=["pothole"], dependencies=[Depends(security_scheme)]) | |
# async def get_all_user(token: HTTPAuthorizationCredentials = Depends(security_scheme)): | |
# verify_token(token.credentials) | |
# files = pothole_image_collection.find({}) | |
# file_list = [] | |
# for file in files: | |
# file["_id"] = str(file["_id"]) # Convert ObjectId to string | |
# file_list.append(file) | |
# return file_list | |
# Function to upload file to Firebase Storage | |
import os | |
async def upload_file_to_firebase(file: UploadFile, token: HTTPAuthorizationCredentials): | |
verify_token(token.credentials) | |
try: | |
# Generate a unique ID for the file | |
fileID = uuid.uuid4().hex | |
# Extract the filename from the path provided by Flutter | |
filename = os.path.basename(file.filename) | |
# Get reference to Firebase Storage bucket | |
bucket = storage.bucket() | |
# Create a blob object with the filename | |
blob = bucket.blob(fileID + "-" + filename) | |
# Upload the file | |
blob.upload_from_file(file.file) | |
blob.make_public() | |
# Get the public URL of the uploaded file | |
url = blob.public_url | |
# Insert the document into the collection with Firebase URL | |
pothole_image_collection.insert_one({"fileID": fileID, "url": url, "filename": filename}) | |
return JSONResponse(content={"message": "file uploaded successfully", "file_id": fileID}) | |
except Exception as e: | |
return JSONResponse(content={"message": f"Error occurred: {str(e)}"}, status_code=500) | |
async def upload_file_api(file: UploadFile = File(...), token: HTTPAuthorizationCredentials = Depends(security_scheme)): | |
return await upload_file_to_firebase(file, token) | |
# Api to get all information about pothole | |
async def get_all_info_with_filters(filters: PotholeFilters, token: HTTPAuthorizationCredentials = Depends(security_scheme)): | |
verify_token(token.credentials) | |
# Construct a filter query based on provided parameters | |
filter_query = {} | |
if filters.userID == "66142a506d8f2a0f116fd613": | |
# If userID is the specific ID, do not filter by userID | |
pass | |
elif filters.userID: | |
filter_query["userId"] = filters.userID | |
# Always apply the status filter | |
if filters.status: | |
filter_query["status"] = filters.status | |
# Apply the filter query to find relevant pothole information | |
pothole_informations = notification_collection.find(filter_query) | |
info_list = [] | |
for info in pothole_informations: | |
info["_id"] = str(info["_id"]) | |
file_id = info.get('fileID') | |
if file_id: | |
image_data = pothole_image_collection.find_one({"fileID": file_id}) | |
if image_data: | |
info['image'] = image_data.get('url') | |
else: | |
print("No image data found for file ID:", file_id) # Debugging print | |
info_list.append(info) | |
return info_list | |
# Api to get information about pothole by unique id | |
async def get_data_by_id(potHoleInfoById:PotInfoById,token: HTTPAuthorizationCredentials = Depends(security_scheme)): | |
verify_token(token.credentials) | |
try: | |
object_id = ObjectId(potHoleInfoById.infoID) | |
data = notification_collection.find_one({"_id": object_id}) | |
if data: | |
data["_id"] = str(data["_id"]) | |
image_data = pothole_image_collection.find_one({"fileID": data["fileID"]}) | |
if image_data: | |
data["image"] = image_data["url"] # Append image to the response | |
return data | |
else: | |
raise HTTPException(status_code=404, detail="Data not found") | |
except: | |
raise HTTPException(status_code=400, detail="Invalid ID format") | |
# Api to to verify the given object has pothole or not | |
async def verify_pothole(potholeModel: PotholeModel, token: HTTPAuthorizationCredentials = Depends(security_scheme)): | |
verify_token(token.credentials) | |
try: | |
# Get image bytes from URL | |
response = requests.get(potholeModel.image) | |
image_bytes = response.content | |
# Pass image bytes to your model function | |
results = load_image_model(image_bytes) | |
return JSONResponse(content={"response": results[2:]}) | |
except Exception as e: | |
return JSONResponse(content={"response": f"{e}"}) | |