credentials.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. """
  2. AWS Credential Management Module
  3. This module handles AWS credential acquisition for both Assume Role and Access Key
  4. authentication methods.
  5. Requirements:
  6. - 2.2: Use centralized base account for Assume Role
  7. - 2.3: Securely store and use Access Key credentials
  8. """
  9. import boto3
  10. from botocore.exceptions import ClientError, NoCredentialsError
  11. from typing import Dict, Any, Optional
  12. import logging
  13. logger = logging.getLogger(__name__)
  14. class CredentialError(Exception):
  15. """Exception raised for credential-related errors"""
  16. pass
  17. class AWSCredentialProvider:
  18. """
  19. Provides AWS credentials using either Assume Role or Access Key methods.
  20. This class handles the complexity of obtaining AWS credentials from different
  21. sources and provides a unified interface for the scanner.
  22. """
  23. def __init__(
  24. self,
  25. credential_type: str,
  26. credential_config: Dict[str, Any],
  27. base_credentials: Optional[Dict[str, str]] = None
  28. ):
  29. """
  30. Initialize the credential provider.
  31. Args:
  32. credential_type: Either 'assume_role' or 'access_key'
  33. credential_config: Configuration for the credential
  34. For assume_role: {'role_arn': str, 'external_id': str (optional)}
  35. For access_key: {'access_key_id': str, 'secret_access_key': str}
  36. base_credentials: Base account credentials for Assume Role
  37. {'access_key_id': str, 'secret_access_key': str}
  38. """
  39. self.credential_type = credential_type
  40. self.credential_config = credential_config
  41. self.base_credentials = base_credentials
  42. self._session: Optional[boto3.Session] = None
  43. self._account_id: Optional[str] = None
  44. def get_session(self, region_name: Optional[str] = None) -> boto3.Session:
  45. """
  46. Get a boto3 Session with the configured credentials.
  47. Args:
  48. region_name: Optional region for the session
  49. Returns:
  50. boto3.Session configured with appropriate credentials
  51. Raises:
  52. CredentialError: If credentials cannot be obtained
  53. """
  54. if self.credential_type == 'assume_role':
  55. return self._get_assume_role_session(region_name)
  56. elif self.credential_type == 'access_key':
  57. return self._get_access_key_session(region_name)
  58. else:
  59. raise CredentialError(f"Unknown credential type: {self.credential_type}")
  60. def _get_assume_role_session(self, region_name: Optional[str] = None) -> boto3.Session:
  61. """
  62. Get a session using Assume Role.
  63. Uses the base account credentials to assume a role in the target account.
  64. Supports optional session token for temporary base credentials.
  65. """
  66. role_arn = self.credential_config.get('role_arn')
  67. external_id = self.credential_config.get('external_id')
  68. if not role_arn:
  69. raise CredentialError("Role ARN is required for assume_role credential type")
  70. if not self.base_credentials:
  71. raise CredentialError("Base credentials are required for assume_role")
  72. try:
  73. # Create base session with the centralized account credentials
  74. base_session_params = {
  75. 'aws_access_key_id': self.base_credentials.get('access_key_id'),
  76. 'aws_secret_access_key': self.base_credentials.get('secret_access_key'),
  77. 'region_name': region_name or 'us-east-1'
  78. }
  79. # Add session token if provided (for temporary credentials)
  80. if self.base_credentials.get('session_token'):
  81. base_session_params['aws_session_token'] = self.base_credentials.get('session_token')
  82. base_session = boto3.Session(**base_session_params)
  83. # Use STS to assume the role
  84. sts_client = base_session.client('sts')
  85. assume_role_params = {
  86. 'RoleArn': role_arn,
  87. 'RoleSessionName': 'AWSResourceScanner',
  88. 'DurationSeconds': 3600 # 1 hour
  89. }
  90. if external_id:
  91. assume_role_params['ExternalId'] = external_id
  92. response = sts_client.assume_role(**assume_role_params)
  93. credentials = response['Credentials']
  94. # Create a new session with the assumed role credentials
  95. return boto3.Session(
  96. aws_access_key_id=credentials['AccessKeyId'],
  97. aws_secret_access_key=credentials['SecretAccessKey'],
  98. aws_session_token=credentials['SessionToken'],
  99. region_name=region_name
  100. )
  101. except ClientError as e:
  102. error_code = e.response.get('Error', {}).get('Code', 'Unknown')
  103. error_message = e.response.get('Error', {}).get('Message', str(e))
  104. logger.error(f"Failed to assume role {role_arn}: {error_code} - {error_message}")
  105. raise CredentialError(f"Failed to assume role: {error_message}")
  106. except NoCredentialsError:
  107. raise CredentialError("Base credentials are invalid or not configured")
  108. except Exception as e:
  109. logger.error(f"Unexpected error assuming role: {str(e)}")
  110. raise CredentialError(f"Unexpected error: {str(e)}")
  111. def _get_access_key_session(self, region_name: Optional[str] = None) -> boto3.Session:
  112. """
  113. Get a session using Access Key credentials.
  114. Supports optional session token for temporary credentials.
  115. """
  116. access_key_id = self.credential_config.get('access_key_id')
  117. secret_access_key = self.credential_config.get('secret_access_key')
  118. session_token = self.credential_config.get('session_token')
  119. if not access_key_id or not secret_access_key:
  120. raise CredentialError("Access Key ID and Secret Access Key are required")
  121. try:
  122. session_params = {
  123. 'aws_access_key_id': access_key_id,
  124. 'aws_secret_access_key': secret_access_key,
  125. 'region_name': region_name
  126. }
  127. # Add session token if provided (for temporary credentials)
  128. if session_token:
  129. session_params['aws_session_token'] = session_token
  130. return boto3.Session(**session_params)
  131. except Exception as e:
  132. logger.error(f"Failed to create session with access key: {str(e)}")
  133. raise CredentialError(f"Failed to create session: {str(e)}")
  134. def validate(self) -> bool:
  135. """
  136. Validate that the credentials are valid and working.
  137. Returns:
  138. True if credentials are valid
  139. Raises:
  140. CredentialError: If credentials are invalid
  141. """
  142. try:
  143. session = self.get_session(region_name='us-east-1')
  144. sts_client = session.client('sts')
  145. response = sts_client.get_caller_identity()
  146. self._account_id = response.get('Account')
  147. logger.info(f"Credentials validated for account: {self._account_id}")
  148. return True
  149. except ClientError as e:
  150. error_message = e.response.get('Error', {}).get('Message', str(e))
  151. raise CredentialError(f"Credential validation failed: {error_message}")
  152. except Exception as e:
  153. raise CredentialError(f"Credential validation failed: {str(e)}")
  154. def get_account_id(self) -> str:
  155. """
  156. Get the AWS account ID for the credentials.
  157. Returns:
  158. AWS account ID string
  159. """
  160. if self._account_id:
  161. return self._account_id
  162. try:
  163. session = self.get_session(region_name='us-east-1')
  164. sts_client = session.client('sts')
  165. response = sts_client.get_caller_identity()
  166. self._account_id = response.get('Account')
  167. return self._account_id
  168. except Exception as e:
  169. logger.error(f"Failed to get account ID: {str(e)}")
  170. raise CredentialError(f"Failed to get account ID: {str(e)}")
  171. def create_credential_provider_from_model(
  172. credential,
  173. base_config=None
  174. ) -> AWSCredentialProvider:
  175. """
  176. Create an AWSCredentialProvider from database models.
  177. Args:
  178. credential: AWSCredential model instance
  179. base_config: BaseAssumeRoleConfig model instance (required for assume_role)
  180. Returns:
  181. Configured AWSCredentialProvider
  182. """
  183. credential_config = {}
  184. base_credentials = None
  185. if credential.credential_type == 'assume_role':
  186. credential_config = {
  187. 'role_arn': credential.role_arn,
  188. 'external_id': credential.external_id
  189. }
  190. if base_config:
  191. base_credentials = {
  192. 'access_key_id': base_config.access_key_id,
  193. 'secret_access_key': base_config.get_secret_access_key()
  194. }
  195. # Add session token if available (for temporary credentials)
  196. session_token = base_config.get_session_token()
  197. if session_token:
  198. base_credentials['session_token'] = session_token
  199. else: # access_key
  200. credential_config = {
  201. 'access_key_id': credential.access_key_id,
  202. 'secret_access_key': credential.get_secret_access_key()
  203. }
  204. return AWSCredentialProvider(
  205. credential_type=credential.credential_type,
  206. credential_config=credential_config,
  207. base_credentials=base_credentials
  208. )