from fastapi import HTTPException, UploadFile, File, Form from typing import Optional import bcrypt import os import shutil from ..utils import get_user_cropped_image_from_photo # Registrering a face async def register_user(db, email: str, name: str, role: str, file: UploadFile = File(...)): """ Processes and stores the image uploaded into vectordb as image embeddings. :param db: The vector db collection handle to which the image embedding with email id as key will be upserted :param email: The email id of the user being registered, this is assumed to be unique per user record :param name: The user name (different from email) for display :param role: The role associated with the user, it can only be student or teacher :param file: The facial image of the user being registered, the first recognized face image would be used. :return: email """ unique_filename = f"{email}.jpg" # Use the email as the filename file_path = f"/home/user/data/tmp/{unique_filename}" # Specify our upload directory # Ensure the directory exists os.makedirs(os.path.dirname(file_path), exist_ok=True) # Then, proceed to open the file with open(file_path, "wb") as buffer: contents = await file.read() buffer.write(contents) # Process the image to extract the face cropped_face = get_user_cropped_image_from_photo(file_path) if cropped_face is not None: # Here we can store the embeddings along with user details in ChromaDB # chroma_db.save_embeddings(user_id, embeddings) db.upsert(images=[cropped_face], ids=[email], metadatas=[{"name":name, "role":role}]) return {"status": "User registered successfully", "image": cropped_face} else: return {"error": "No faces detected"} #os.remove(file_path) # Optionally remove the file after processing, if not needed # Admin Authentication def verify_admin_password(submitted_user: str, submitted_password: str) -> bool: """ Verifies the submitted password against the stored hash. :param submitted_user: The username submitted by the user. :param submitted_password: The password submitted by the user. :return: True if the password is correct, False otherwise. """ if submitted_user == "admin": # Retrieve the stored hash from environment variable stored_password_hash = os.getenv("EC_ADMIN_PWD", "").encode('utf-8') print(stored_password_hash) # Directly compare the submitted password with the stored hash return bcrypt.checkpw(submitted_password.encode('utf-8'), stored_password_hash) return False def get_disk_usage(path="/home/user/data"): total, used, free = shutil.disk_usage(path) # Convert bytes to MB by dividing by 2^20 return { "total": total / (2**20), "used": used / (2**20), "free": free / (2**20) } # Additional Admin Functions # we could include other administrative functionalities here, such as: # - Listing all registered users. # - Moderating chat messages or viewing chat history. # - Managing system settings or configurations.