Files
drb-core-server/app/routers/auth.py
2025-05-25 15:59:16 -04:00

130 lines
4.6 KiB
Python

import functools
from quart import Blueprint, jsonify, request, current_app, abort
from werkzeug.security import generate_password_hash, check_password_hash
from quart_jwt_extended import create_access_token, jwt_required, get_jwt_identity
from internal.auth_wrappers import UserDbController
from internal.types import UserRoles
from uuid import uuid4
# Import the centralized JWTManager instance
from config.jwt_config import jwt as jwt_manager_instance # Renamed to avoid confusion with jwt_required
auth_bp = Blueprint('auth', __name__)
# Decorator for role-based access control
def role_required(required_role: UserRoles):
def wrapper(fn):
@functools.wraps(fn)
@jwt_required
async def decorated_view(*args, **kwargs):
current_user_identity = get_jwt_identity()
user_id = current_user_identity['id']
# Make a DB call to get the user and their role
user = await current_app.user_db_h.find_user({"_id": user_id})
if not user:
abort(401, "User not found or invalid token.") # User corresponding to token not found
user_role = user.role # Get the role from the fetched user object
role_order = {UserRoles.USER: 0, UserRoles.MOD: 1, UserRoles.ADMIN: 2}
if role_order[user_role] < role_order[required_role]:
abort(403, "Permission denied: Insufficient role.")
# REMOVE current_app.ensure_sync() here
return await fn(*args, **kwargs) # Directly await the original async function
return decorated_view
return wrapper
@auth_bp.route('/register', methods=['POST'])
async def register_user():
data = await request.get_json()
username = data.get('username')
password = data.get('password')
role = data.get('role', UserRoles.USER.value) # Default to 'user' role
if not username or not password:
abort(400, "Username and password are required")
existing_user = await current_app.user_db_h.find_user({"username": username})
if existing_user:
abort(409, "Username already exists")
hashed_password = generate_password_hash(password)
try:
user_role = UserRoles(role)
except ValueError:
abort(400, f"Invalid role: {role}. Must be one of {list(UserRoles)}")
user_data = {
"username": username,
"password_hash": hashed_password,
"role": user_role.value
}
new_user = await current_app.user_db_h.create_user(user_data)
if new_user:
return jsonify({"message": "User registered successfully", "user_id": new_user._id}), 201
else:
abort(500, "Failed to register user")
@auth_bp.route('/login', methods=['POST'])
async def login_user():
data = await request.get_json()
username = data.get('username')
password = data.get('password')
if not username or not password:
abort(400, "Username and password are required")
user = await current_app.user_db_h.find_user({"username": username})
if not user or not check_password_hash(user.password_hash, password):
abort(401, "Invalid credentials")
access_token = create_access_token(identity={"id": user._id, "username": user.username})
return jsonify(access_token=access_token), 200
@auth_bp.route('/generate_api_key', methods=['POST'])
@jwt_required
async def generate_api_key():
current_user_identity = get_jwt_identity()
user_id = current_user_identity['id']
user_role = current_user_identity['role']
if user_role not in [UserRoles.ADMIN.value, UserRoles.MOD.value]:
target_user_id = (await request.get_json()).get('user_id', user_id)
if target_user_id != user_id:
abort(403, "Permission denied: You can only generate an API key for your own account.")
else:
target_user_id = (await request.get_json()).get('user_id', user_id)
new_api_key = str(uuid4())
updated_count = await current_app.user_db_h.update_user(
{"_id": target_user_id},
{"$set": {"api_key": new_api_key}}
)
if updated_count:
return jsonify({"message": f"API key generated for user {target_user_id}", "api_key": new_api_key}), 200
else:
abort(404, f"User {target_user_id} not found or unable to update API key.")
@auth_bp.route('/admin_only', methods=['GET'])
@jwt_required
@role_required(UserRoles.ADMIN)
async def admin_only_route():
return jsonify({"message": "Welcome, Admin!"}), 200
@auth_bp.route('/mod_or_admin_only', methods=['GET'])
@jwt_required
@role_required(UserRoles.MOD)
async def mod_or_admin_only_route():
return jsonify({"message": "Welcome, Mod or Admin!"}), 200