credentials.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. """
  2. AWS Credential Management Module
  3. This module handles AWS credential acquisition for both Assume Role and Access Key
  4. authentication methods.
  5. Requirements:
  6. - 2.2: Use centralized base account for Assume Role
  7. - 2.3: Securely store and use Access Key credentials
  8. """
  9. import boto3
  10. from botocore.exceptions import ClientError, NoCredentialsError
  11. from typing import Dict, Any, Optional
  12. import logging
  13. logger = logging.getLogger(__name__)
  14. class CredentialError(Exception):
  15. """Exception raised for credential-related errors"""
  16. pass
  17. class AWSCredentialProvider:
  18. """
  19. Provides AWS credentials using either Assume Role or Access Key methods.
  20. This class handles the complexity of obtaining AWS credentials from different
  21. sources and provides a unified interface for the scanner.
  22. """
  23. def __init__(
  24. self,
  25. credential_type: str,
  26. credential_config: Dict[str, Any],
  27. base_credentials: Optional[Dict[str, str]] = None
  28. ):
  29. """
  30. Initialize the credential provider.
  31. Args:
  32. credential_type: Either 'assume_role' or 'access_key'
  33. credential_config: Configuration for the credential
  34. For assume_role: {'role_arn': str, 'external_id': str (optional)}
  35. For access_key: {'access_key_id': str, 'secret_access_key': str}
  36. base_credentials: Base account credentials for Assume Role
  37. {'access_key_id': str, 'secret_access_key': str}
  38. """
  39. self.credential_type = credential_type
  40. self.credential_config = credential_config
  41. self.base_credentials = base_credentials
  42. self._session: Optional[boto3.Session] = None
  43. self._account_id: Optional[str] = None
  44. def get_session(self, region_name: Optional[str] = None) -> boto3.Session:
  45. """
  46. Get a boto3 Session with the configured credentials.
  47. Args:
  48. region_name: Optional region for the session
  49. Returns:
  50. boto3.Session configured with appropriate credentials
  51. Raises:
  52. CredentialError: If credentials cannot be obtained
  53. """
  54. if self.credential_type == 'assume_role':
  55. return self._get_assume_role_session(region_name)
  56. elif self.credential_type == 'access_key':
  57. return self._get_access_key_session(region_name)
  58. else:
  59. raise CredentialError(f"Unknown credential type: {self.credential_type}")
  60. def _get_assume_role_session(self, region_name: Optional[str] = None) -> boto3.Session:
  61. """
  62. Get a session using Assume Role.
  63. Uses the base account credentials to assume a role in the target account.
  64. """
  65. role_arn = self.credential_config.get('role_arn')
  66. external_id = self.credential_config.get('external_id')
  67. if not role_arn:
  68. raise CredentialError("Role ARN is required for assume_role credential type")
  69. if not self.base_credentials:
  70. raise CredentialError("Base credentials are required for assume_role")
  71. try:
  72. # Create base session with the centralized account credentials
  73. base_session = boto3.Session(
  74. aws_access_key_id=self.base_credentials.get('access_key_id'),
  75. aws_secret_access_key=self.base_credentials.get('secret_access_key'),
  76. region_name=region_name or 'us-east-1'
  77. )
  78. # Use STS to assume the role
  79. sts_client = base_session.client('sts')
  80. assume_role_params = {
  81. 'RoleArn': role_arn,
  82. 'RoleSessionName': 'AWSResourceScanner',
  83. 'DurationSeconds': 3600 # 1 hour
  84. }
  85. if external_id:
  86. assume_role_params['ExternalId'] = external_id
  87. response = sts_client.assume_role(**assume_role_params)
  88. credentials = response['Credentials']
  89. # Create a new session with the assumed role credentials
  90. return boto3.Session(
  91. aws_access_key_id=credentials['AccessKeyId'],
  92. aws_secret_access_key=credentials['SecretAccessKey'],
  93. aws_session_token=credentials['SessionToken'],
  94. region_name=region_name
  95. )
  96. except ClientError as e:
  97. error_code = e.response.get('Error', {}).get('Code', 'Unknown')
  98. error_message = e.response.get('Error', {}).get('Message', str(e))
  99. logger.error(f"Failed to assume role {role_arn}: {error_code} - {error_message}")
  100. raise CredentialError(f"Failed to assume role: {error_message}")
  101. except NoCredentialsError:
  102. raise CredentialError("Base credentials are invalid or not configured")
  103. except Exception as e:
  104. logger.error(f"Unexpected error assuming role: {str(e)}")
  105. raise CredentialError(f"Unexpected error: {str(e)}")
  106. def _get_access_key_session(self, region_name: Optional[str] = None) -> boto3.Session:
  107. """
  108. Get a session using Access Key credentials.
  109. """
  110. access_key_id = self.credential_config.get('access_key_id')
  111. secret_access_key = self.credential_config.get('secret_access_key')
  112. if not access_key_id or not secret_access_key:
  113. raise CredentialError("Access Key ID and Secret Access Key are required")
  114. try:
  115. return boto3.Session(
  116. aws_access_key_id=access_key_id,
  117. aws_secret_access_key=secret_access_key,
  118. region_name=region_name
  119. )
  120. except Exception as e:
  121. logger.error(f"Failed to create session with access key: {str(e)}")
  122. raise CredentialError(f"Failed to create session: {str(e)}")
  123. def validate(self) -> bool:
  124. """
  125. Validate that the credentials are valid and working.
  126. Returns:
  127. True if credentials are valid
  128. Raises:
  129. CredentialError: If credentials are invalid
  130. """
  131. try:
  132. session = self.get_session(region_name='us-east-1')
  133. sts_client = session.client('sts')
  134. response = sts_client.get_caller_identity()
  135. self._account_id = response.get('Account')
  136. logger.info(f"Credentials validated for account: {self._account_id}")
  137. return True
  138. except ClientError as e:
  139. error_message = e.response.get('Error', {}).get('Message', str(e))
  140. raise CredentialError(f"Credential validation failed: {error_message}")
  141. except Exception as e:
  142. raise CredentialError(f"Credential validation failed: {str(e)}")
  143. def get_account_id(self) -> str:
  144. """
  145. Get the AWS account ID for the credentials.
  146. Returns:
  147. AWS account ID string
  148. """
  149. if self._account_id:
  150. return self._account_id
  151. try:
  152. session = self.get_session(region_name='us-east-1')
  153. sts_client = session.client('sts')
  154. response = sts_client.get_caller_identity()
  155. self._account_id = response.get('Account')
  156. return self._account_id
  157. except Exception as e:
  158. logger.error(f"Failed to get account ID: {str(e)}")
  159. raise CredentialError(f"Failed to get account ID: {str(e)}")
  160. def create_credential_provider_from_model(
  161. credential,
  162. base_config=None
  163. ) -> AWSCredentialProvider:
  164. """
  165. Create an AWSCredentialProvider from database models.
  166. Args:
  167. credential: AWSCredential model instance
  168. base_config: BaseAssumeRoleConfig model instance (required for assume_role)
  169. Returns:
  170. Configured AWSCredentialProvider
  171. """
  172. credential_config = {}
  173. base_credentials = None
  174. if credential.credential_type == 'assume_role':
  175. credential_config = {
  176. 'role_arn': credential.role_arn,
  177. 'external_id': credential.external_id
  178. }
  179. if base_config:
  180. base_credentials = {
  181. 'access_key_id': base_config.access_key_id,
  182. 'secret_access_key': base_config.get_secret_access_key()
  183. }
  184. else: # access_key
  185. credential_config = {
  186. 'access_key_id': credential.access_key_id,
  187. 'secret_access_key': credential.get_secret_access_key()
  188. }
  189. return AWSCredentialProvider(
  190. credential_type=credential.credential_type,
  191. credential_config=credential_config,
  192. base_credentials=base_credentials
  193. )