| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447 |
- """
- User Management API endpoints (Admin only)
- Provides user CRUD operations and credential assignment.
- """
- from flask import jsonify, request
- from app import db
- from app.api import api_bp
- from app.models import User, UserCredential, AWSCredential
- from app.services import admin_required, get_current_user_from_context
- from app.errors import ValidationError, NotFoundError
- def validate_email(email: str) -> bool:
- """Basic email validation"""
- import re
- pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
- return bool(re.match(pattern, email))
- def validate_role(role: str) -> bool:
- """Validate user role"""
- return role in ['admin', 'power_user', 'user']
- @api_bp.route('/users', methods=['GET'])
- @admin_required
- def get_users():
- """
- Get users list with pagination and search
-
- Query params:
- page: Page number (default: 1)
- page_size: Items per page (default: 20, max: 100)
- search: Search term for username or email
-
- Returns:
- {
- "data": [user objects],
- "pagination": {
- "page": 1,
- "page_size": 20,
- "total": 100,
- "total_pages": 5
- }
- }
- """
- # Get pagination parameters
- page = request.args.get('page', 1, type=int)
- # Support both pageSize (frontend) and page_size (backend convention)
- page_size = request.args.get('pageSize', type=int) or request.args.get('page_size', type=int) or 20
- search = request.args.get('search', '', type=str)
-
- # Validate pagination parameters
- if page < 1:
- page = 1
- if page_size < 1:
- page_size = 20
- if page_size > 100:
- page_size = 100
-
- # Build query
- query = User.query
-
- # Apply search filter
- if search:
- search_term = f'%{search}%'
- query = query.filter(
- db.or_(
- User.username.ilike(search_term),
- User.email.ilike(search_term)
- )
- )
-
- # Order by created_at descending
- query = query.order_by(User.created_at.desc())
-
- # Get total count
- total = query.count()
-
- # Calculate total pages
- total_pages = (total + page_size - 1) // page_size if total > 0 else 1
-
- # Apply pagination
- users = query.offset((page - 1) * page_size).limit(page_size).all()
-
- return jsonify({
- 'data': [user.to_dict() for user in users],
- 'pagination': {
- 'page': page,
- 'page_size': page_size,
- 'total': total,
- 'total_pages': total_pages
- }
- }), 200
- @api_bp.route('/users/create', methods=['POST'])
- @admin_required
- def create_user():
- """
- Create a new user
-
- Request body:
- {
- "username": "string" (required),
- "password": "string" (required),
- "email": "string" (required),
- "role": "admin" | "power_user" | "user" (required)
- }
-
- Returns:
- { user object }
- """
- data = request.get_json()
-
- if not data:
- raise ValidationError(
- message="Request body is required",
- details={"reason": "missing_body"}
- )
-
- # Validate required fields
- required_fields = ['username', 'password', 'email', 'role']
- missing_fields = [field for field in required_fields if not data.get(field)]
-
- if missing_fields:
- raise ValidationError(
- message="Missing required fields",
- details={"missing_fields": missing_fields}
- )
-
- username = data['username'].strip()
- password = data['password']
- email = data['email'].strip().lower()
- role = data['role']
-
- # Validate username length
- if len(username) < 3 or len(username) > 50:
- raise ValidationError(
- message="Username must be between 3 and 50 characters",
- details={"field": "username", "reason": "invalid_length"}
- )
-
- # Validate password length
- if len(password) < 6:
- raise ValidationError(
- message="Password must be at least 6 characters",
- details={"field": "password", "reason": "too_short"}
- )
-
- # Validate email format
- if not validate_email(email):
- raise ValidationError(
- message="Invalid email format",
- details={"field": "email", "reason": "invalid_format"}
- )
-
- # Validate role
- if not validate_role(role):
- raise ValidationError(
- message="Invalid role. Must be one of: admin, power_user, user",
- details={"field": "role", "reason": "invalid_value"}
- )
-
- # Check if username already exists
- if User.query.filter_by(username=username).first():
- raise ValidationError(
- message="Username already exists",
- details={"field": "username", "reason": "already_exists"}
- )
-
- # Check if email already exists
- if User.query.filter_by(email=email).first():
- raise ValidationError(
- message="Email already exists",
- details={"field": "email", "reason": "already_exists"}
- )
-
- # Create new user
- user = User(
- username=username,
- email=email,
- role=role,
- is_active=True
- )
- user.set_password(password)
-
- db.session.add(user)
- db.session.commit()
-
- return jsonify(user.to_dict()), 201
- @api_bp.route('/users/update', methods=['POST'])
- @admin_required
- def update_user():
- """
- Update an existing user
-
- Request body:
- {
- "id": number (required),
- "username": "string" (optional),
- "email": "string" (optional),
- "password": "string" (optional),
- "role": "admin" | "power_user" | "user" (optional),
- "is_active": boolean (optional)
- }
-
- Returns:
- { updated user object }
- """
- data = request.get_json()
-
- if not data:
- raise ValidationError(
- message="Request body is required",
- details={"reason": "missing_body"}
- )
-
- # Validate user ID
- user_id = data.get('id')
- if not user_id:
- raise ValidationError(
- message="User ID is required",
- details={"missing_fields": ["id"]}
- )
-
- # Find user
- user = db.session.get(User, user_id)
- if not user:
- raise NotFoundError(
- message="User not found",
- details={"user_id": user_id}
- )
-
- # Get current admin user
- current_user = get_current_user_from_context()
-
- # Prevent admin from deactivating themselves
- if user.id == current_user.id and data.get('is_active') is False:
- raise ValidationError(
- message="Cannot deactivate your own account",
- details={"reason": "self_deactivation"}
- )
-
- # Prevent admin from changing their own role
- if user.id == current_user.id and data.get('role') and data.get('role') != user.role:
- raise ValidationError(
- message="Cannot change your own role",
- details={"reason": "self_role_change"}
- )
-
- # Update username if provided
- if 'username' in data and data['username']:
- new_username = data['username'].strip()
- if len(new_username) < 3 or len(new_username) > 50:
- raise ValidationError(
- message="Username must be between 3 and 50 characters",
- details={"field": "username", "reason": "invalid_length"}
- )
- # Check if username is taken by another user
- existing = User.query.filter_by(username=new_username).first()
- if existing and existing.id != user.id:
- raise ValidationError(
- message="Username already exists",
- details={"field": "username", "reason": "already_exists"}
- )
- user.username = new_username
-
- # Update email if provided
- if 'email' in data and data['email']:
- new_email = data['email'].strip().lower()
- if not validate_email(new_email):
- raise ValidationError(
- message="Invalid email format",
- details={"field": "email", "reason": "invalid_format"}
- )
- # Check if email is taken by another user
- existing = User.query.filter_by(email=new_email).first()
- if existing and existing.id != user.id:
- raise ValidationError(
- message="Email already exists",
- details={"field": "email", "reason": "already_exists"}
- )
- user.email = new_email
-
- # Update password if provided
- if 'password' in data and data['password']:
- if len(data['password']) < 6:
- raise ValidationError(
- message="Password must be at least 6 characters",
- details={"field": "password", "reason": "too_short"}
- )
- user.set_password(data['password'])
-
- # Update role if provided
- if 'role' in data and data['role']:
- if not validate_role(data['role']):
- raise ValidationError(
- message="Invalid role. Must be one of: admin, power_user, user",
- details={"field": "role", "reason": "invalid_value"}
- )
- user.role = data['role']
-
- # Update is_active if provided
- if 'is_active' in data:
- user.is_active = bool(data['is_active'])
-
- db.session.commit()
-
- return jsonify(user.to_dict()), 200
- @api_bp.route('/users/delete', methods=['POST'])
- @admin_required
- def delete_user():
- """
- Delete a user
-
- Request body:
- {
- "id": number (required)
- }
-
- Returns:
- { "message": "User deleted successfully" }
- """
- data = request.get_json()
-
- if not data:
- raise ValidationError(
- message="Request body is required",
- details={"reason": "missing_body"}
- )
-
- # Validate user ID
- user_id = data.get('id')
- if not user_id:
- raise ValidationError(
- message="User ID is required",
- details={"missing_fields": ["id"]}
- )
-
- # Find user
- user = db.session.get(User, user_id)
- if not user:
- raise NotFoundError(
- message="User not found",
- details={"user_id": user_id}
- )
-
- # Get current admin user
- current_user = get_current_user_from_context()
-
- # Prevent admin from deleting themselves
- if user.id == current_user.id:
- raise ValidationError(
- message="Cannot delete your own account",
- details={"reason": "self_deletion"}
- )
-
- # Delete user (cascade will handle related records)
- db.session.delete(user)
- db.session.commit()
-
- return jsonify({
- 'message': 'User deleted successfully'
- }), 200
- @api_bp.route('/users/assign-credentials', methods=['POST'])
- @admin_required
- def assign_credentials():
- """
- Assign credentials to a user
-
- Request body:
- {
- "user_id": number (required),
- "credential_ids": [number] (required)
- }
-
- Returns:
- { "message": "Credentials assigned successfully", "assigned_count": number }
- """
- data = request.get_json()
-
- if not data:
- raise ValidationError(
- message="Request body is required",
- details={"reason": "missing_body"}
- )
-
- # Validate required fields
- user_id = data.get('user_id')
- credential_ids = data.get('credential_ids')
-
- if not user_id:
- raise ValidationError(
- message="User ID is required",
- details={"missing_fields": ["user_id"]}
- )
-
- if credential_ids is None:
- raise ValidationError(
- message="Credential IDs are required",
- details={"missing_fields": ["credential_ids"]}
- )
-
- if not isinstance(credential_ids, list):
- raise ValidationError(
- message="Credential IDs must be a list",
- details={"field": "credential_ids", "reason": "invalid_type"}
- )
-
- # Find user
- user = db.session.get(User, user_id)
- if not user:
- raise NotFoundError(
- message="User not found",
- details={"user_id": user_id}
- )
-
- # Remove existing credential assignments
- UserCredential.query.filter_by(user_id=user_id).delete()
-
- # Assign new credentials
- assigned_count = 0
- for cred_id in credential_ids:
- # Verify credential exists
- credential = db.session.get(AWSCredential, cred_id)
- if credential and credential.is_active:
- assignment = UserCredential(
- user_id=user_id,
- credential_id=cred_id
- )
- db.session.add(assignment)
- assigned_count += 1
-
- db.session.commit()
-
- return jsonify({
- 'message': 'Credentials assigned successfully',
- 'assigned_count': assigned_count
- }), 200
|