""" AWS Credentials Management API endpoints Provides credential CRUD operations and validation. Requirements: 2.1, 2.4, 2.6, 2.7 """ from flask import jsonify, request, g from app import db from app.api import api_bp from app.models import AWSCredential, UserCredential, BaseAssumeRoleConfig from app.services import login_required, admin_required, get_current_user_from_context, get_accessible_credentials from app.errors import ValidationError, NotFoundError from app.scanners.credentials import AWSCredentialProvider, CredentialError def validate_account_id(account_id: str) -> bool: """Validate AWS account ID format (12 digits)""" if not account_id: return False return len(account_id) == 12 and account_id.isdigit() def validate_role_arn(role_arn: str) -> bool: """Validate AWS Role ARN format""" if not role_arn: return False # Basic ARN format: arn:aws:iam::account-id:role/role-name return role_arn.startswith('arn:aws:iam::') and ':role/' in role_arn @api_bp.route('/credentials', methods=['GET']) @login_required def get_credentials(): """ Get credentials list with pagination (sensitive info masked) Query params: page: Page number (default: 1) page_size: Items per page (default: 20, max: 100) Returns: { "data": [credential objects with masked sensitive data], "pagination": {...} } """ # Get pagination parameters page = request.args.get('page', 1, type=int) # Support both pageSize (frontend) and page_size (backend convention) page_size = request.args.get('pageSize', type=int) or request.args.get('page_size', type=int) or 20 # Validate pagination parameters if page < 1: page = 1 if page_size < 1: page_size = 20 if page_size > 100: page_size = 100 # Get current user current_user = get_current_user_from_context() # Get accessible credentials based on user role query = get_accessible_credentials(current_user) # Order by created_at descending query = query.order_by(AWSCredential.created_at.desc()) # Get total count total = query.count() # Calculate total pages total_pages = (total + page_size - 1) // page_size if total > 0 else 1 # Apply pagination credentials = query.offset((page - 1) * page_size).limit(page_size).all() return jsonify({ 'data': [cred.to_dict(mask_sensitive=True) for cred in credentials], 'pagination': { 'page': page, 'page_size': page_size, 'total': total, 'total_pages': total_pages } }), 200 @api_bp.route('/credentials/create', methods=['POST']) @admin_required def create_credential(): """ Create a new AWS credential Request body: { "name": "string" (required), "credential_type": "assume_role" | "access_key" (required), "account_id": "string" (required, 12 digits), "role_arn": "string" (required for assume_role), "external_id": "string" (optional for assume_role), "access_key_id": "string" (required for access_key), "secret_access_key": "string" (required for access_key) } Returns: { credential object } """ data = request.get_json() if not data: raise ValidationError( message="Request body is required", details={"reason": "missing_body"} ) # Validate required fields required_fields = ['name', 'credential_type'] if data.get('credential_type') == 'assume_role': required_fields.append('account_id') missing_fields = [field for field in required_fields if not data.get(field)] if missing_fields: raise ValidationError( message="Missing required fields", details={"missing_fields": missing_fields} ) name = data['name'].strip() credential_type = data['credential_type'] account_id = data.get('account_id', '').strip() if data.get('account_id') else None # Validate name length if len(name) < 1 or len(name) > 100: raise ValidationError( message="Name must be between 1 and 100 characters", details={"field": "name", "reason": "invalid_length"} ) # Validate credential type if credential_type not in ['assume_role', 'access_key']: raise ValidationError( message="Invalid credential type. Must be 'assume_role' or 'access_key'", details={"field": "credential_type", "reason": "invalid_value"} ) # Validate account ID (only required for assume_role) if credential_type == 'assume_role': if not account_id or not validate_account_id(account_id): raise ValidationError( message="Invalid AWS account ID. Must be 12 digits", details={"field": "account_id", "reason": "invalid_format"} ) # Validate type-specific fields if credential_type == 'assume_role': role_arn = data.get('role_arn', '').strip() if not role_arn: raise ValidationError( message="Role ARN is required for assume_role credential type", details={"missing_fields": ["role_arn"]} ) if not validate_role_arn(role_arn): raise ValidationError( message="Invalid Role ARN format", details={"field": "role_arn", "reason": "invalid_format"} ) else: # access_key access_key_id = data.get('access_key_id', '').strip() secret_access_key = data.get('secret_access_key', '').strip() if not access_key_id: raise ValidationError( message="Access Key ID is required for access_key credential type", details={"missing_fields": ["access_key_id"]} ) if not secret_access_key: raise ValidationError( message="Secret Access Key is required for access_key credential type", details={"missing_fields": ["secret_access_key"]} ) # Create credential credential = AWSCredential( name=name, credential_type=credential_type, account_id=account_id, # Will be None for access_key initially is_active=True ) if credential_type == 'assume_role': credential.role_arn = data.get('role_arn', '').strip() credential.external_id = data.get('external_id', '').strip() or None else: access_key_id = data.get('access_key_id', '').strip() secret_access_key = data.get('secret_access_key', '').strip() credential.access_key_id = access_key_id credential.set_secret_access_key(secret_access_key) # Auto-detect account ID for access_key type if not account_id: try: provider = AWSCredentialProvider( credential_type='access_key', credential_config={ 'access_key_id': access_key_id, 'secret_access_key': secret_access_key } ) provider.validate() detected_account_id = provider.get_account_id() credential.account_id = detected_account_id except Exception as e: raise ValidationError( message=f"Failed to validate Access Key credentials: {str(e)}", details={"reason": "credential_validation_failed"} ) db.session.add(credential) db.session.commit() return jsonify(credential.to_dict(mask_sensitive=True)), 201 @api_bp.route('/credentials/update', methods=['POST']) @admin_required def update_credential(): """ Update an existing AWS credential Request body: { "id": number (required), "name": "string" (optional), "account_id": "string" (optional), "role_arn": "string" (optional, for assume_role), "external_id": "string" (optional, for assume_role), "access_key_id": "string" (optional, for access_key), "secret_access_key": "string" (optional, for access_key), "is_active": boolean (optional) } Returns: { updated credential object } """ data = request.get_json() if not data: raise ValidationError( message="Request body is required", details={"reason": "missing_body"} ) # Validate credential ID credential_id = data.get('id') if not credential_id: raise ValidationError( message="Credential ID is required", details={"missing_fields": ["id"]} ) # Find credential credential = db.session.get(AWSCredential, credential_id) if not credential: raise NotFoundError( message="Credential not found", details={"credential_id": credential_id} ) # Update name if provided if 'name' in data and data['name']: new_name = data['name'].strip() if len(new_name) < 1 or len(new_name) > 100: raise ValidationError( message="Name must be between 1 and 100 characters", details={"field": "name", "reason": "invalid_length"} ) credential.name = new_name # Update account_id if provided if 'account_id' in data and data['account_id']: new_account_id = data['account_id'].strip() if not validate_account_id(new_account_id): raise ValidationError( message="Invalid AWS account ID. Must be 12 digits", details={"field": "account_id", "reason": "invalid_format"} ) credential.account_id = new_account_id # Update type-specific fields if credential.credential_type == 'assume_role': if 'role_arn' in data and data['role_arn']: new_role_arn = data['role_arn'].strip() if not validate_role_arn(new_role_arn): raise ValidationError( message="Invalid Role ARN format", details={"field": "role_arn", "reason": "invalid_format"} ) credential.role_arn = new_role_arn if 'external_id' in data: credential.external_id = data['external_id'].strip() if data['external_id'] else None else: # access_key if 'access_key_id' in data and data['access_key_id']: credential.access_key_id = data['access_key_id'].strip() if 'secret_access_key' in data and data['secret_access_key']: credential.set_secret_access_key(data['secret_access_key'].strip()) # Update is_active if provided if 'is_active' in data: credential.is_active = bool(data['is_active']) db.session.commit() return jsonify(credential.to_dict(mask_sensitive=True)), 200 @api_bp.route('/credentials/delete', methods=['POST']) @admin_required def delete_credential(): """ Delete an AWS credential Request body: { "id": number (required) } Returns: { "message": "Credential deleted successfully" } """ data = request.get_json() if not data: raise ValidationError( message="Request body is required", details={"reason": "missing_body"} ) # Validate credential ID credential_id = data.get('id') if not credential_id: raise ValidationError( message="Credential ID is required", details={"missing_fields": ["id"]} ) # Find credential credential = db.session.get(AWSCredential, credential_id) if not credential: raise NotFoundError( message="Credential not found", details={"credential_id": credential_id} ) # Delete credential (cascade will handle user assignments) db.session.delete(credential) db.session.commit() return jsonify({ 'message': 'Credential deleted successfully' }), 200 @api_bp.route('/credentials/validate', methods=['POST']) @admin_required def validate_credential(): """ Validate an AWS credential by testing connection to AWS Request body: { "id": number (required) - existing credential ID } OR { "credential_type": "assume_role" | "access_key" (required), "role_arn": "string" (required for assume_role), "external_id": "string" (optional for assume_role), "access_key_id": "string" (required for access_key), "secret_access_key": "string" (required for access_key) } Returns: { "valid": boolean, "account_id": "string" (if valid), "error": "string" (if invalid) } """ data = request.get_json() if not data: raise ValidationError( message="Request body is required", details={"reason": "missing_body"} ) credential_config = {} base_credentials = None credential_type = None # Check if validating existing credential by ID if 'id' in data: credential_id = data['id'] credential = db.session.get(AWSCredential, credential_id) if not credential: raise NotFoundError( message="Credential not found", details={"credential_id": credential_id} ) credential_type = credential.credential_type if credential_type == 'assume_role': credential_config = { 'role_arn': credential.role_arn, 'external_id': credential.external_id } # Get base credentials for assume role base_config = BaseAssumeRoleConfig.query.first() if not base_config: raise ValidationError( message="Base Assume Role configuration not found. Please configure it first.", details={"reason": "missing_base_config"} ) base_credentials = { 'access_key_id': base_config.access_key_id, 'secret_access_key': base_config.get_secret_access_key() } else: credential_config = { 'access_key_id': credential.access_key_id, 'secret_access_key': credential.get_secret_access_key() } else: # Validating new credential data credential_type = data.get('credential_type') if not credential_type: raise ValidationError( message="Either 'id' or 'credential_type' is required", details={"reason": "missing_identifier"} ) if credential_type == 'assume_role': role_arn = data.get('role_arn', '').strip() if not role_arn: raise ValidationError( message="Role ARN is required for assume_role validation", details={"missing_fields": ["role_arn"]} ) credential_config = { 'role_arn': role_arn, 'external_id': data.get('external_id', '').strip() or None } # Get base credentials for assume role base_config = BaseAssumeRoleConfig.query.first() if not base_config: raise ValidationError( message="Base Assume Role configuration not found. Please configure it first.", details={"reason": "missing_base_config"} ) base_credentials = { 'access_key_id': base_config.access_key_id, 'secret_access_key': base_config.get_secret_access_key() } elif credential_type == 'access_key': access_key_id = data.get('access_key_id', '').strip() secret_access_key = data.get('secret_access_key', '').strip() if not access_key_id or not secret_access_key: raise ValidationError( message="Access Key ID and Secret Access Key are required", details={"missing_fields": ["access_key_id", "secret_access_key"]} ) credential_config = { 'access_key_id': access_key_id, 'secret_access_key': secret_access_key } else: raise ValidationError( message="Invalid credential type", details={"field": "credential_type", "reason": "invalid_value"} ) # Validate the credential try: provider = AWSCredentialProvider( credential_type=credential_type, credential_config=credential_config, base_credentials=base_credentials ) provider.validate() account_id = provider.get_account_id() return jsonify({ 'valid': True, 'account_id': account_id }), 200 except CredentialError as e: return jsonify({ 'valid': False, 'error': str(e) }), 200 except Exception as e: return jsonify({ 'valid': False, 'error': f"Validation failed: {str(e)}" }), 200 @api_bp.route('/credentials/base-role', methods=['GET']) @admin_required def get_base_role(): """ Get base Assume Role configuration Returns: { base role config object with masked sensitive data } OR { "configured": false } if not configured """ config = BaseAssumeRoleConfig.query.first() if not config: return jsonify({ 'configured': False }), 200 return jsonify({ 'configured': True, 'data': config.to_dict(mask_sensitive=True) }), 200 @api_bp.route('/credentials/base-role', methods=['POST']) @admin_required def update_base_role(): """ Update base Assume Role configuration Request body: { "access_key_id": "string" (required), "secret_access_key": "string" (required) } Returns: { base role config object } """ data = request.get_json() if not data: raise ValidationError( message="Request body is required", details={"reason": "missing_body"} ) # Validate required fields access_key_id = data.get('access_key_id', '').strip() secret_access_key = data.get('secret_access_key', '').strip() if not access_key_id: raise ValidationError( message="Access Key ID is required", details={"missing_fields": ["access_key_id"]} ) if not secret_access_key: raise ValidationError( message="Secret Access Key is required", details={"missing_fields": ["secret_access_key"]} ) # Validate the credentials before saving try: provider = AWSCredentialProvider( credential_type='access_key', credential_config={ 'access_key_id': access_key_id, 'secret_access_key': secret_access_key } ) provider.validate() except CredentialError as e: raise ValidationError( message=f"Invalid credentials: {str(e)}", details={"reason": "validation_failed"} ) except Exception as e: raise ValidationError( message=f"Credential validation failed: {str(e)}", details={"reason": "validation_error"} ) # Get or create config config = BaseAssumeRoleConfig.query.first() if config: # Update existing config config.access_key_id = access_key_id config.set_secret_access_key(secret_access_key) else: # Create new config config = BaseAssumeRoleConfig( access_key_id=access_key_id ) config.set_secret_access_key(secret_access_key) db.session.add(config) db.session.commit() return jsonify({ 'message': 'Base Assume Role configuration updated successfully', 'data': config.to_dict(mask_sensitive=True) }), 200