""" User Management API endpoints (Admin only) Provides user CRUD operations and credential assignment. """ from flask import jsonify, request from app import db from app.api import api_bp from app.models import User, UserCredential, AWSCredential from app.services import admin_required, get_current_user_from_context from app.errors import ValidationError, NotFoundError def validate_email(email: str) -> bool: """Basic email validation""" import re pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return bool(re.match(pattern, email)) def validate_role(role: str) -> bool: """Validate user role""" return role in ['admin', 'power_user', 'user'] @api_bp.route('/users', methods=['GET']) @admin_required def get_users(): """ Get users list with pagination and search Query params: page: Page number (default: 1) page_size: Items per page (default: 20, max: 100) search: Search term for username or email Returns: { "data": [user objects], "pagination": { "page": 1, "page_size": 20, "total": 100, "total_pages": 5 } } """ # Get pagination parameters page = request.args.get('page', 1, type=int) # Support both pageSize (frontend) and page_size (backend convention) page_size = request.args.get('pageSize', type=int) or request.args.get('page_size', type=int) or 20 search = request.args.get('search', '', type=str) # Validate pagination parameters if page < 1: page = 1 if page_size < 1: page_size = 20 if page_size > 100: page_size = 100 # Build query query = User.query # Apply search filter if search: search_term = f'%{search}%' query = query.filter( db.or_( User.username.ilike(search_term), User.email.ilike(search_term) ) ) # Order by created_at descending query = query.order_by(User.created_at.desc()) # Get total count total = query.count() # Calculate total pages total_pages = (total + page_size - 1) // page_size if total > 0 else 1 # Apply pagination users = query.offset((page - 1) * page_size).limit(page_size).all() return jsonify({ 'data': [user.to_dict() for user in users], 'pagination': { 'page': page, 'page_size': page_size, 'total': total, 'total_pages': total_pages } }), 200 @api_bp.route('/users/create', methods=['POST']) @admin_required def create_user(): """ Create a new user Request body: { "username": "string" (required), "password": "string" (required), "email": "string" (required), "role": "admin" | "power_user" | "user" (required) } Returns: { user object } """ data = request.get_json() if not data: raise ValidationError( message="Request body is required", details={"reason": "missing_body"} ) # Validate required fields required_fields = ['username', 'password', 'email', 'role'] missing_fields = [field for field in required_fields if not data.get(field)] if missing_fields: raise ValidationError( message="Missing required fields", details={"missing_fields": missing_fields} ) username = data['username'].strip() password = data['password'] email = data['email'].strip().lower() role = data['role'] # Validate username length if len(username) < 3 or len(username) > 50: raise ValidationError( message="Username must be between 3 and 50 characters", details={"field": "username", "reason": "invalid_length"} ) # Validate password length if len(password) < 6: raise ValidationError( message="Password must be at least 6 characters", details={"field": "password", "reason": "too_short"} ) # Validate email format if not validate_email(email): raise ValidationError( message="Invalid email format", details={"field": "email", "reason": "invalid_format"} ) # Validate role if not validate_role(role): raise ValidationError( message="Invalid role. Must be one of: admin, power_user, user", details={"field": "role", "reason": "invalid_value"} ) # Check if username already exists if User.query.filter_by(username=username).first(): raise ValidationError( message="Username already exists", details={"field": "username", "reason": "already_exists"} ) # Check if email already exists if User.query.filter_by(email=email).first(): raise ValidationError( message="Email already exists", details={"field": "email", "reason": "already_exists"} ) # Create new user user = User( username=username, email=email, role=role, is_active=True ) user.set_password(password) db.session.add(user) db.session.commit() return jsonify(user.to_dict()), 201 @api_bp.route('/users/update', methods=['POST']) @admin_required def update_user(): """ Update an existing user Request body: { "id": number (required), "username": "string" (optional), "email": "string" (optional), "password": "string" (optional), "role": "admin" | "power_user" | "user" (optional), "is_active": boolean (optional) } Returns: { updated user object } """ data = request.get_json() if not data: raise ValidationError( message="Request body is required", details={"reason": "missing_body"} ) # Validate user ID user_id = data.get('id') if not user_id: raise ValidationError( message="User ID is required", details={"missing_fields": ["id"]} ) # Find user user = db.session.get(User, user_id) if not user: raise NotFoundError( message="User not found", details={"user_id": user_id} ) # Get current admin user current_user = get_current_user_from_context() # Prevent admin from deactivating themselves if user.id == current_user.id and data.get('is_active') is False: raise ValidationError( message="Cannot deactivate your own account", details={"reason": "self_deactivation"} ) # Prevent admin from changing their own role if user.id == current_user.id and data.get('role') and data.get('role') != user.role: raise ValidationError( message="Cannot change your own role", details={"reason": "self_role_change"} ) # Update username if provided if 'username' in data and data['username']: new_username = data['username'].strip() if len(new_username) < 3 or len(new_username) > 50: raise ValidationError( message="Username must be between 3 and 50 characters", details={"field": "username", "reason": "invalid_length"} ) # Check if username is taken by another user existing = User.query.filter_by(username=new_username).first() if existing and existing.id != user.id: raise ValidationError( message="Username already exists", details={"field": "username", "reason": "already_exists"} ) user.username = new_username # Update email if provided if 'email' in data and data['email']: new_email = data['email'].strip().lower() if not validate_email(new_email): raise ValidationError( message="Invalid email format", details={"field": "email", "reason": "invalid_format"} ) # Check if email is taken by another user existing = User.query.filter_by(email=new_email).first() if existing and existing.id != user.id: raise ValidationError( message="Email already exists", details={"field": "email", "reason": "already_exists"} ) user.email = new_email # Update password if provided if 'password' in data and data['password']: if len(data['password']) < 6: raise ValidationError( message="Password must be at least 6 characters", details={"field": "password", "reason": "too_short"} ) user.set_password(data['password']) # Update role if provided if 'role' in data and data['role']: if not validate_role(data['role']): raise ValidationError( message="Invalid role. Must be one of: admin, power_user, user", details={"field": "role", "reason": "invalid_value"} ) user.role = data['role'] # Update is_active if provided if 'is_active' in data: user.is_active = bool(data['is_active']) db.session.commit() return jsonify(user.to_dict()), 200 @api_bp.route('/users/delete', methods=['POST']) @admin_required def delete_user(): """ Delete a user Request body: { "id": number (required) } Returns: { "message": "User deleted successfully" } """ data = request.get_json() if not data: raise ValidationError( message="Request body is required", details={"reason": "missing_body"} ) # Validate user ID user_id = data.get('id') if not user_id: raise ValidationError( message="User ID is required", details={"missing_fields": ["id"]} ) # Find user user = db.session.get(User, user_id) if not user: raise NotFoundError( message="User not found", details={"user_id": user_id} ) # Get current admin user current_user = get_current_user_from_context() # Prevent admin from deleting themselves if user.id == current_user.id: raise ValidationError( message="Cannot delete your own account", details={"reason": "self_deletion"} ) # Delete user (cascade will handle related records) db.session.delete(user) db.session.commit() return jsonify({ 'message': 'User deleted successfully' }), 200 @api_bp.route('/users/assign-credentials', methods=['POST']) @admin_required def assign_credentials(): """ Assign credentials to a user Request body: { "user_id": number (required), "credential_ids": [number] (required) } Returns: { "message": "Credentials assigned successfully", "assigned_count": number } """ data = request.get_json() if not data: raise ValidationError( message="Request body is required", details={"reason": "missing_body"} ) # Validate required fields user_id = data.get('user_id') credential_ids = data.get('credential_ids') if not user_id: raise ValidationError( message="User ID is required", details={"missing_fields": ["user_id"]} ) if credential_ids is None: raise ValidationError( message="Credential IDs are required", details={"missing_fields": ["credential_ids"]} ) if not isinstance(credential_ids, list): raise ValidationError( message="Credential IDs must be a list", details={"field": "credential_ids", "reason": "invalid_type"} ) # Find user user = db.session.get(User, user_id) if not user: raise NotFoundError( message="User not found", details={"user_id": user_id} ) # Remove existing credential assignments UserCredential.query.filter_by(user_id=user_id).delete() # Assign new credentials assigned_count = 0 for cred_id in credential_ids: # Verify credential exists credential = db.session.get(AWSCredential, cred_id) if credential and credential.is_active: assignment = UserCredential( user_id=user_id, credential_id=cred_id ) db.session.add(assignment) assigned_count += 1 db.session.commit() return jsonify({ 'message': 'Credentials assigned successfully', 'assigned_count': assigned_count }), 200