| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- """
- AWS Credential Management Module
- This module handles AWS credential acquisition for both Assume Role and Access Key
- authentication methods.
- Requirements:
- - 2.2: Use centralized base account for Assume Role
- - 2.3: Securely store and use Access Key credentials
- """
- import boto3
- from botocore.exceptions import ClientError, NoCredentialsError
- from typing import Dict, Any, Optional
- import logging
- logger = logging.getLogger(__name__)
- class CredentialError(Exception):
- """Exception raised for credential-related errors"""
- pass
- class AWSCredentialProvider:
- """
- Provides AWS credentials using either Assume Role or Access Key methods.
-
- This class handles the complexity of obtaining AWS credentials from different
- sources and provides a unified interface for the scanner.
- """
-
- def __init__(
- self,
- credential_type: str,
- credential_config: Dict[str, Any],
- base_credentials: Optional[Dict[str, str]] = None
- ):
- """
- Initialize the credential provider.
-
- Args:
- credential_type: Either 'assume_role' or 'access_key'
- credential_config: Configuration for the credential
- For assume_role: {'role_arn': str, 'external_id': str (optional)}
- For access_key: {'access_key_id': str, 'secret_access_key': str}
- base_credentials: Base account credentials for Assume Role
- {'access_key_id': str, 'secret_access_key': str}
- """
- self.credential_type = credential_type
- self.credential_config = credential_config
- self.base_credentials = base_credentials
- self._session: Optional[boto3.Session] = None
- self._account_id: Optional[str] = None
-
- def get_session(self, region_name: Optional[str] = None) -> boto3.Session:
- """
- Get a boto3 Session with the configured credentials.
-
- Args:
- region_name: Optional region for the session
-
- Returns:
- boto3.Session configured with appropriate credentials
-
- Raises:
- CredentialError: If credentials cannot be obtained
- """
- if self.credential_type == 'assume_role':
- return self._get_assume_role_session(region_name)
- elif self.credential_type == 'access_key':
- return self._get_access_key_session(region_name)
- else:
- raise CredentialError(f"Unknown credential type: {self.credential_type}")
-
- def _get_assume_role_session(self, region_name: Optional[str] = None) -> boto3.Session:
- """
- Get a session using Assume Role.
-
- Uses the base account credentials to assume a role in the target account.
- """
- role_arn = self.credential_config.get('role_arn')
- external_id = self.credential_config.get('external_id')
-
- if not role_arn:
- raise CredentialError("Role ARN is required for assume_role credential type")
-
- if not self.base_credentials:
- raise CredentialError("Base credentials are required for assume_role")
-
- try:
- # Create base session with the centralized account credentials
- base_session = boto3.Session(
- aws_access_key_id=self.base_credentials.get('access_key_id'),
- aws_secret_access_key=self.base_credentials.get('secret_access_key'),
- region_name=region_name or 'us-east-1'
- )
-
- # Use STS to assume the role
- sts_client = base_session.client('sts')
-
- assume_role_params = {
- 'RoleArn': role_arn,
- 'RoleSessionName': 'AWSResourceScanner',
- 'DurationSeconds': 3600 # 1 hour
- }
-
- if external_id:
- assume_role_params['ExternalId'] = external_id
-
- response = sts_client.assume_role(**assume_role_params)
- credentials = response['Credentials']
-
- # Create a new session with the assumed role credentials
- return boto3.Session(
- aws_access_key_id=credentials['AccessKeyId'],
- aws_secret_access_key=credentials['SecretAccessKey'],
- aws_session_token=credentials['SessionToken'],
- region_name=region_name
- )
-
- except ClientError as e:
- error_code = e.response.get('Error', {}).get('Code', 'Unknown')
- error_message = e.response.get('Error', {}).get('Message', str(e))
- logger.error(f"Failed to assume role {role_arn}: {error_code} - {error_message}")
- raise CredentialError(f"Failed to assume role: {error_message}")
- except NoCredentialsError:
- raise CredentialError("Base credentials are invalid or not configured")
- except Exception as e:
- logger.error(f"Unexpected error assuming role: {str(e)}")
- raise CredentialError(f"Unexpected error: {str(e)}")
-
- def _get_access_key_session(self, region_name: Optional[str] = None) -> boto3.Session:
- """
- Get a session using Access Key credentials.
- """
- access_key_id = self.credential_config.get('access_key_id')
- secret_access_key = self.credential_config.get('secret_access_key')
-
- if not access_key_id or not secret_access_key:
- raise CredentialError("Access Key ID and Secret Access Key are required")
-
- try:
- return boto3.Session(
- aws_access_key_id=access_key_id,
- aws_secret_access_key=secret_access_key,
- region_name=region_name
- )
- except Exception as e:
- logger.error(f"Failed to create session with access key: {str(e)}")
- raise CredentialError(f"Failed to create session: {str(e)}")
-
- def validate(self) -> bool:
- """
- Validate that the credentials are valid and working.
-
- Returns:
- True if credentials are valid
-
- Raises:
- CredentialError: If credentials are invalid
- """
- try:
- session = self.get_session(region_name='us-east-1')
- sts_client = session.client('sts')
- response = sts_client.get_caller_identity()
- self._account_id = response.get('Account')
- logger.info(f"Credentials validated for account: {self._account_id}")
- return True
- except ClientError as e:
- error_message = e.response.get('Error', {}).get('Message', str(e))
- raise CredentialError(f"Credential validation failed: {error_message}")
- except Exception as e:
- raise CredentialError(f"Credential validation failed: {str(e)}")
-
- def get_account_id(self) -> str:
- """
- Get the AWS account ID for the credentials.
-
- Returns:
- AWS account ID string
- """
- if self._account_id:
- return self._account_id
-
- try:
- session = self.get_session(region_name='us-east-1')
- sts_client = session.client('sts')
- response = sts_client.get_caller_identity()
- self._account_id = response.get('Account')
- return self._account_id
- except Exception as e:
- logger.error(f"Failed to get account ID: {str(e)}")
- raise CredentialError(f"Failed to get account ID: {str(e)}")
- def create_credential_provider_from_model(
- credential,
- base_config=None
- ) -> AWSCredentialProvider:
- """
- Create an AWSCredentialProvider from database models.
-
- Args:
- credential: AWSCredential model instance
- base_config: BaseAssumeRoleConfig model instance (required for assume_role)
-
- Returns:
- Configured AWSCredentialProvider
- """
- credential_config = {}
- base_credentials = None
-
- if credential.credential_type == 'assume_role':
- credential_config = {
- 'role_arn': credential.role_arn,
- 'external_id': credential.external_id
- }
-
- if base_config:
- base_credentials = {
- 'access_key_id': base_config.access_key_id,
- 'secret_access_key': base_config.get_secret_access_key()
- }
- else: # access_key
- credential_config = {
- 'access_key_id': credential.access_key_id,
- 'secret_access_key': credential.get_secret_access_key()
- }
-
- return AWSCredentialProvider(
- credential_type=credential.credential_type,
- credential_config=credential_config,
- base_credentials=base_credentials
- )
|