""" AWS Credential Management Module This module handles AWS credential acquisition for both Assume Role and Access Key authentication methods. Requirements: - 2.2: Use centralized base account for Assume Role - 2.3: Securely store and use Access Key credentials """ import boto3 from botocore.exceptions import ClientError, NoCredentialsError from typing import Dict, Any, Optional import logging logger = logging.getLogger(__name__) class CredentialError(Exception): """Exception raised for credential-related errors""" pass class AWSCredentialProvider: """ Provides AWS credentials using either Assume Role or Access Key methods. This class handles the complexity of obtaining AWS credentials from different sources and provides a unified interface for the scanner. """ def __init__( self, credential_type: str, credential_config: Dict[str, Any], base_credentials: Optional[Dict[str, str]] = None ): """ Initialize the credential provider. Args: credential_type: Either 'assume_role' or 'access_key' credential_config: Configuration for the credential For assume_role: {'role_arn': str, 'external_id': str (optional)} For access_key: {'access_key_id': str, 'secret_access_key': str} base_credentials: Base account credentials for Assume Role {'access_key_id': str, 'secret_access_key': str} """ self.credential_type = credential_type self.credential_config = credential_config self.base_credentials = base_credentials self._session: Optional[boto3.Session] = None self._account_id: Optional[str] = None def get_session(self, region_name: Optional[str] = None) -> boto3.Session: """ Get a boto3 Session with the configured credentials. Args: region_name: Optional region for the session Returns: boto3.Session configured with appropriate credentials Raises: CredentialError: If credentials cannot be obtained """ if self.credential_type == 'assume_role': return self._get_assume_role_session(region_name) elif self.credential_type == 'access_key': return self._get_access_key_session(region_name) else: raise CredentialError(f"Unknown credential type: {self.credential_type}") def _get_assume_role_session(self, region_name: Optional[str] = None) -> boto3.Session: """ Get a session using Assume Role. Uses the base account credentials to assume a role in the target account. Supports optional session token for temporary base credentials. """ role_arn = self.credential_config.get('role_arn') external_id = self.credential_config.get('external_id') if not role_arn: raise CredentialError("Role ARN is required for assume_role credential type") if not self.base_credentials: raise CredentialError("Base credentials are required for assume_role") try: # Create base session with the centralized account credentials base_session_params = { 'aws_access_key_id': self.base_credentials.get('access_key_id'), 'aws_secret_access_key': self.base_credentials.get('secret_access_key'), 'region_name': region_name or 'us-east-1' } # Add session token if provided (for temporary credentials) if self.base_credentials.get('session_token'): base_session_params['aws_session_token'] = self.base_credentials.get('session_token') base_session = boto3.Session(**base_session_params) # Use STS to assume the role sts_client = base_session.client('sts') assume_role_params = { 'RoleArn': role_arn, 'RoleSessionName': 'AWSResourceScanner', 'DurationSeconds': 3600 # 1 hour } if external_id: assume_role_params['ExternalId'] = external_id response = sts_client.assume_role(**assume_role_params) credentials = response['Credentials'] # Create a new session with the assumed role credentials return boto3.Session( aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'], region_name=region_name ) except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') error_message = e.response.get('Error', {}).get('Message', str(e)) logger.error(f"Failed to assume role {role_arn}: {error_code} - {error_message}") raise CredentialError(f"Failed to assume role: {error_message}") except NoCredentialsError: raise CredentialError("Base credentials are invalid or not configured") except Exception as e: logger.error(f"Unexpected error assuming role: {str(e)}") raise CredentialError(f"Unexpected error: {str(e)}") def _get_access_key_session(self, region_name: Optional[str] = None) -> boto3.Session: """ Get a session using Access Key credentials. Supports optional session token for temporary credentials. """ access_key_id = self.credential_config.get('access_key_id') secret_access_key = self.credential_config.get('secret_access_key') session_token = self.credential_config.get('session_token') if not access_key_id or not secret_access_key: raise CredentialError("Access Key ID and Secret Access Key are required") try: session_params = { 'aws_access_key_id': access_key_id, 'aws_secret_access_key': secret_access_key, 'region_name': region_name } # Add session token if provided (for temporary credentials) if session_token: session_params['aws_session_token'] = session_token return boto3.Session(**session_params) except Exception as e: logger.error(f"Failed to create session with access key: {str(e)}") raise CredentialError(f"Failed to create session: {str(e)}") def validate(self) -> bool: """ Validate that the credentials are valid and working. Returns: True if credentials are valid Raises: CredentialError: If credentials are invalid """ try: session = self.get_session(region_name='us-east-1') sts_client = session.client('sts') response = sts_client.get_caller_identity() self._account_id = response.get('Account') logger.info(f"Credentials validated for account: {self._account_id}") return True except ClientError as e: error_message = e.response.get('Error', {}).get('Message', str(e)) raise CredentialError(f"Credential validation failed: {error_message}") except Exception as e: raise CredentialError(f"Credential validation failed: {str(e)}") def get_account_id(self) -> str: """ Get the AWS account ID for the credentials. Returns: AWS account ID string """ if self._account_id: return self._account_id try: session = self.get_session(region_name='us-east-1') sts_client = session.client('sts') response = sts_client.get_caller_identity() self._account_id = response.get('Account') return self._account_id except Exception as e: logger.error(f"Failed to get account ID: {str(e)}") raise CredentialError(f"Failed to get account ID: {str(e)}") def create_credential_provider_from_model( credential, base_config=None ) -> AWSCredentialProvider: """ Create an AWSCredentialProvider from database models. Args: credential: AWSCredential model instance base_config: BaseAssumeRoleConfig model instance (required for assume_role) Returns: Configured AWSCredentialProvider """ credential_config = {} base_credentials = None if credential.credential_type == 'assume_role': credential_config = { 'role_arn': credential.role_arn, 'external_id': credential.external_id } if base_config: base_credentials = { 'access_key_id': base_config.access_key_id, 'secret_access_key': base_config.get_secret_access_key() } # Add session token if available (for temporary credentials) session_token = base_config.get_session_token() if session_token: base_credentials['session_token'] = session_token else: # access_key credential_config = { 'access_key_id': credential.access_key_id, 'secret_access_key': credential.get_secret_access_key() } return AWSCredentialProvider( credential_type=credential.credential_type, credential_config=credential_config, base_credentials=base_credentials )