| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616 |
- """
- AWS Credentials Management API endpoints
- Provides credential CRUD operations and validation.
- Requirements: 2.1, 2.4, 2.6, 2.7
- """
- from flask import jsonify, request, g
- from app import db
- from app.api import api_bp
- from app.models import AWSCredential, UserCredential, BaseAssumeRoleConfig
- from app.services import login_required, admin_required, get_current_user_from_context, get_accessible_credentials
- from app.errors import ValidationError, NotFoundError
- from app.scanners.credentials import AWSCredentialProvider, CredentialError
- def validate_account_id(account_id: str) -> bool:
- """Validate AWS account ID format (12 digits)"""
- if not account_id:
- return False
- return len(account_id) == 12 and account_id.isdigit()
- def validate_role_arn(role_arn: str) -> bool:
- """Validate AWS Role ARN format"""
- if not role_arn:
- return False
- # Basic ARN format: arn:aws:iam::account-id:role/role-name
- return role_arn.startswith('arn:aws:iam::') and ':role/' in role_arn
- @api_bp.route('/credentials', methods=['GET'])
- @login_required
- def get_credentials():
- """
- Get credentials list with pagination (sensitive info masked)
-
- Query params:
- page: Page number (default: 1)
- page_size: Items per page (default: 20, max: 100)
-
- Returns:
- {
- "data": [credential objects with masked sensitive data],
- "pagination": {...}
- }
- """
- # Get pagination parameters
- page = request.args.get('page', 1, type=int)
- # Support both pageSize (frontend) and page_size (backend convention)
- page_size = request.args.get('pageSize', type=int) or request.args.get('page_size', type=int) or 20
-
- # Validate pagination parameters
- if page < 1:
- page = 1
- if page_size < 1:
- page_size = 20
- if page_size > 100:
- page_size = 100
-
- # Get current user
- current_user = get_current_user_from_context()
-
- # Get accessible credentials based on user role
- query = get_accessible_credentials(current_user)
-
- # Order by created_at descending
- query = query.order_by(AWSCredential.created_at.desc())
-
- # Get total count
- total = query.count()
-
- # Calculate total pages
- total_pages = (total + page_size - 1) // page_size if total > 0 else 1
-
- # Apply pagination
- credentials = query.offset((page - 1) * page_size).limit(page_size).all()
-
- return jsonify({
- 'data': [cred.to_dict(mask_sensitive=True) for cred in credentials],
- 'pagination': {
- 'page': page,
- 'page_size': page_size,
- 'total': total,
- 'total_pages': total_pages
- }
- }), 200
- @api_bp.route('/credentials/create', methods=['POST'])
- @admin_required
- def create_credential():
- """
- Create a new AWS credential
-
- Request body:
- {
- "name": "string" (required),
- "credential_type": "assume_role" | "access_key" (required),
- "account_id": "string" (required, 12 digits),
- "role_arn": "string" (required for assume_role),
- "external_id": "string" (optional for assume_role),
- "access_key_id": "string" (required for access_key),
- "secret_access_key": "string" (required for access_key)
- }
-
- Returns:
- { credential object }
- """
- data = request.get_json()
-
- if not data:
- raise ValidationError(
- message="Request body is required",
- details={"reason": "missing_body"}
- )
-
- # Validate required fields
- required_fields = ['name', 'credential_type']
- if data.get('credential_type') == 'assume_role':
- required_fields.append('account_id')
-
- missing_fields = [field for field in required_fields if not data.get(field)]
-
- if missing_fields:
- raise ValidationError(
- message="Missing required fields",
- details={"missing_fields": missing_fields}
- )
-
- name = data['name'].strip()
- credential_type = data['credential_type']
- account_id = data.get('account_id', '').strip() if data.get('account_id') else None
-
- # Validate name length
- if len(name) < 1 or len(name) > 100:
- raise ValidationError(
- message="Name must be between 1 and 100 characters",
- details={"field": "name", "reason": "invalid_length"}
- )
-
- # Validate credential type
- if credential_type not in ['assume_role', 'access_key']:
- raise ValidationError(
- message="Invalid credential type. Must be 'assume_role' or 'access_key'",
- details={"field": "credential_type", "reason": "invalid_value"}
- )
-
- # Validate account ID (only required for assume_role)
- if credential_type == 'assume_role':
- if not account_id or not validate_account_id(account_id):
- raise ValidationError(
- message="Invalid AWS account ID. Must be 12 digits",
- details={"field": "account_id", "reason": "invalid_format"}
- )
-
- # Validate type-specific fields
- if credential_type == 'assume_role':
- role_arn = data.get('role_arn', '').strip()
- if not role_arn:
- raise ValidationError(
- message="Role ARN is required for assume_role credential type",
- details={"missing_fields": ["role_arn"]}
- )
- if not validate_role_arn(role_arn):
- raise ValidationError(
- message="Invalid Role ARN format",
- details={"field": "role_arn", "reason": "invalid_format"}
- )
- else: # access_key
- access_key_id = data.get('access_key_id', '').strip()
- secret_access_key = data.get('secret_access_key', '').strip()
-
- if not access_key_id:
- raise ValidationError(
- message="Access Key ID is required for access_key credential type",
- details={"missing_fields": ["access_key_id"]}
- )
- if not secret_access_key:
- raise ValidationError(
- message="Secret Access Key is required for access_key credential type",
- details={"missing_fields": ["secret_access_key"]}
- )
-
- # Create credential
- credential = AWSCredential(
- name=name,
- credential_type=credential_type,
- account_id=account_id, # Will be None for access_key initially
- is_active=True
- )
-
- if credential_type == 'assume_role':
- credential.role_arn = data.get('role_arn', '').strip()
- credential.external_id = data.get('external_id', '').strip() or None
- else:
- access_key_id = data.get('access_key_id', '').strip()
- secret_access_key = data.get('secret_access_key', '').strip()
-
- credential.access_key_id = access_key_id
- credential.set_secret_access_key(secret_access_key)
-
- # Auto-detect account ID for access_key type
- if not account_id:
- try:
- provider = AWSCredentialProvider(
- credential_type='access_key',
- credential_config={
- 'access_key_id': access_key_id,
- 'secret_access_key': secret_access_key
- }
- )
- provider.validate()
- detected_account_id = provider.get_account_id()
- credential.account_id = detected_account_id
- except Exception as e:
- raise ValidationError(
- message=f"Failed to validate Access Key credentials: {str(e)}",
- details={"reason": "credential_validation_failed"}
- )
-
- db.session.add(credential)
- db.session.commit()
-
- return jsonify(credential.to_dict(mask_sensitive=True)), 201
- @api_bp.route('/credentials/update', methods=['POST'])
- @admin_required
- def update_credential():
- """
- Update an existing AWS credential
-
- Request body:
- {
- "id": number (required),
- "name": "string" (optional),
- "account_id": "string" (optional),
- "role_arn": "string" (optional, for assume_role),
- "external_id": "string" (optional, for assume_role),
- "access_key_id": "string" (optional, for access_key),
- "secret_access_key": "string" (optional, for access_key),
- "is_active": boolean (optional)
- }
-
- Returns:
- { updated credential object }
- """
- data = request.get_json()
-
- if not data:
- raise ValidationError(
- message="Request body is required",
- details={"reason": "missing_body"}
- )
-
- # Validate credential ID
- credential_id = data.get('id')
- if not credential_id:
- raise ValidationError(
- message="Credential ID is required",
- details={"missing_fields": ["id"]}
- )
-
- # Find credential
- credential = db.session.get(AWSCredential, credential_id)
- if not credential:
- raise NotFoundError(
- message="Credential not found",
- details={"credential_id": credential_id}
- )
-
- # Update name if provided
- if 'name' in data and data['name']:
- new_name = data['name'].strip()
- if len(new_name) < 1 or len(new_name) > 100:
- raise ValidationError(
- message="Name must be between 1 and 100 characters",
- details={"field": "name", "reason": "invalid_length"}
- )
- credential.name = new_name
-
- # Update account_id if provided
- if 'account_id' in data and data['account_id']:
- new_account_id = data['account_id'].strip()
- if not validate_account_id(new_account_id):
- raise ValidationError(
- message="Invalid AWS account ID. Must be 12 digits",
- details={"field": "account_id", "reason": "invalid_format"}
- )
- credential.account_id = new_account_id
-
- # Update type-specific fields
- if credential.credential_type == 'assume_role':
- if 'role_arn' in data and data['role_arn']:
- new_role_arn = data['role_arn'].strip()
- if not validate_role_arn(new_role_arn):
- raise ValidationError(
- message="Invalid Role ARN format",
- details={"field": "role_arn", "reason": "invalid_format"}
- )
- credential.role_arn = new_role_arn
-
- if 'external_id' in data:
- credential.external_id = data['external_id'].strip() if data['external_id'] else None
- else: # access_key
- if 'access_key_id' in data and data['access_key_id']:
- credential.access_key_id = data['access_key_id'].strip()
-
- if 'secret_access_key' in data and data['secret_access_key']:
- credential.set_secret_access_key(data['secret_access_key'].strip())
-
- # Update is_active if provided
- if 'is_active' in data:
- credential.is_active = bool(data['is_active'])
-
- db.session.commit()
-
- return jsonify(credential.to_dict(mask_sensitive=True)), 200
- @api_bp.route('/credentials/delete', methods=['POST'])
- @admin_required
- def delete_credential():
- """
- Delete an AWS credential
-
- Request body:
- {
- "id": number (required)
- }
-
- Returns:
- { "message": "Credential deleted successfully" }
- """
- data = request.get_json()
-
- if not data:
- raise ValidationError(
- message="Request body is required",
- details={"reason": "missing_body"}
- )
-
- # Validate credential ID
- credential_id = data.get('id')
- if not credential_id:
- raise ValidationError(
- message="Credential ID is required",
- details={"missing_fields": ["id"]}
- )
-
- # Find credential
- credential = db.session.get(AWSCredential, credential_id)
- if not credential:
- raise NotFoundError(
- message="Credential not found",
- details={"credential_id": credential_id}
- )
-
- # Delete credential (cascade will handle user assignments)
- db.session.delete(credential)
- db.session.commit()
-
- return jsonify({
- 'message': 'Credential deleted successfully'
- }), 200
- @api_bp.route('/credentials/validate', methods=['POST'])
- @admin_required
- def validate_credential():
- """
- Validate an AWS credential by testing connection to AWS
-
- Request body:
- {
- "id": number (required) - existing credential ID
- }
- OR
- {
- "credential_type": "assume_role" | "access_key" (required),
- "role_arn": "string" (required for assume_role),
- "external_id": "string" (optional for assume_role),
- "access_key_id": "string" (required for access_key),
- "secret_access_key": "string" (required for access_key)
- }
-
- Returns:
- { "valid": boolean, "account_id": "string" (if valid), "error": "string" (if invalid) }
- """
- data = request.get_json()
-
- if not data:
- raise ValidationError(
- message="Request body is required",
- details={"reason": "missing_body"}
- )
-
- credential_config = {}
- base_credentials = None
- credential_type = None
-
- # Check if validating existing credential by ID
- if 'id' in data:
- credential_id = data['id']
- credential = db.session.get(AWSCredential, credential_id)
- if not credential:
- raise NotFoundError(
- message="Credential not found",
- details={"credential_id": credential_id}
- )
-
- credential_type = credential.credential_type
-
- if credential_type == 'assume_role':
- credential_config = {
- 'role_arn': credential.role_arn,
- 'external_id': credential.external_id
- }
- # Get base credentials for assume role
- base_config = BaseAssumeRoleConfig.query.first()
- if not base_config:
- raise ValidationError(
- message="Base Assume Role configuration not found. Please configure it first.",
- details={"reason": "missing_base_config"}
- )
- base_credentials = {
- 'access_key_id': base_config.access_key_id,
- 'secret_access_key': base_config.get_secret_access_key()
- }
- else:
- credential_config = {
- 'access_key_id': credential.access_key_id,
- 'secret_access_key': credential.get_secret_access_key()
- }
- else:
- # Validating new credential data
- credential_type = data.get('credential_type')
- if not credential_type:
- raise ValidationError(
- message="Either 'id' or 'credential_type' is required",
- details={"reason": "missing_identifier"}
- )
-
- if credential_type == 'assume_role':
- role_arn = data.get('role_arn', '').strip()
- if not role_arn:
- raise ValidationError(
- message="Role ARN is required for assume_role validation",
- details={"missing_fields": ["role_arn"]}
- )
- credential_config = {
- 'role_arn': role_arn,
- 'external_id': data.get('external_id', '').strip() or None
- }
- # Get base credentials for assume role
- base_config = BaseAssumeRoleConfig.query.first()
- if not base_config:
- raise ValidationError(
- message="Base Assume Role configuration not found. Please configure it first.",
- details={"reason": "missing_base_config"}
- )
- base_credentials = {
- 'access_key_id': base_config.access_key_id,
- 'secret_access_key': base_config.get_secret_access_key()
- }
- elif credential_type == 'access_key':
- access_key_id = data.get('access_key_id', '').strip()
- secret_access_key = data.get('secret_access_key', '').strip()
-
- if not access_key_id or not secret_access_key:
- raise ValidationError(
- message="Access Key ID and Secret Access Key are required",
- details={"missing_fields": ["access_key_id", "secret_access_key"]}
- )
- credential_config = {
- 'access_key_id': access_key_id,
- 'secret_access_key': secret_access_key
- }
- else:
- raise ValidationError(
- message="Invalid credential type",
- details={"field": "credential_type", "reason": "invalid_value"}
- )
-
- # Validate the credential
- try:
- provider = AWSCredentialProvider(
- credential_type=credential_type,
- credential_config=credential_config,
- base_credentials=base_credentials
- )
- provider.validate()
- account_id = provider.get_account_id()
-
- return jsonify({
- 'valid': True,
- 'account_id': account_id
- }), 200
-
- except CredentialError as e:
- return jsonify({
- 'valid': False,
- 'error': str(e)
- }), 200
- except Exception as e:
- return jsonify({
- 'valid': False,
- 'error': f"Validation failed: {str(e)}"
- }), 200
- @api_bp.route('/credentials/base-role', methods=['GET'])
- @admin_required
- def get_base_role():
- """
- Get base Assume Role configuration
-
- Returns:
- { base role config object with masked sensitive data }
- OR
- { "configured": false } if not configured
- """
- config = BaseAssumeRoleConfig.query.first()
-
- if not config:
- return jsonify({
- 'configured': False
- }), 200
-
- return jsonify({
- 'configured': True,
- 'data': config.to_dict(mask_sensitive=True)
- }), 200
- @api_bp.route('/credentials/base-role', methods=['POST'])
- @admin_required
- def update_base_role():
- """
- Update base Assume Role configuration
-
- Request body:
- {
- "access_key_id": "string" (required),
- "secret_access_key": "string" (required)
- }
-
- Returns:
- { base role config object }
- """
- data = request.get_json()
-
- if not data:
- raise ValidationError(
- message="Request body is required",
- details={"reason": "missing_body"}
- )
-
- # Validate required fields
- access_key_id = data.get('access_key_id', '').strip()
- secret_access_key = data.get('secret_access_key', '').strip()
-
- if not access_key_id:
- raise ValidationError(
- message="Access Key ID is required",
- details={"missing_fields": ["access_key_id"]}
- )
-
- if not secret_access_key:
- raise ValidationError(
- message="Secret Access Key is required",
- details={"missing_fields": ["secret_access_key"]}
- )
-
- # Validate the credentials before saving
- try:
- provider = AWSCredentialProvider(
- credential_type='access_key',
- credential_config={
- 'access_key_id': access_key_id,
- 'secret_access_key': secret_access_key
- }
- )
- provider.validate()
- except CredentialError as e:
- raise ValidationError(
- message=f"Invalid credentials: {str(e)}",
- details={"reason": "validation_failed"}
- )
- except Exception as e:
- raise ValidationError(
- message=f"Credential validation failed: {str(e)}",
- details={"reason": "validation_error"}
- )
-
- # Get or create config
- config = BaseAssumeRoleConfig.query.first()
-
- if config:
- # Update existing config
- config.access_key_id = access_key_id
- config.set_secret_access_key(secret_access_key)
- else:
- # Create new config
- config = BaseAssumeRoleConfig(
- access_key_id=access_key_id
- )
- config.set_secret_access_key(secret_access_key)
- db.session.add(config)
-
- db.session.commit()
-
- return jsonify({
- 'message': 'Base Assume Role configuration updated successfully',
- 'data': config.to_dict(mask_sensitive=True)
- }), 200
|