|
|
@@ -78,6 +78,7 @@ class AWSCredentialProvider:
|
|
|
Get a session using Assume Role.
|
|
|
|
|
|
Uses the base account credentials to assume a role in the target account.
|
|
|
+ Supports optional session token for temporary base credentials.
|
|
|
"""
|
|
|
role_arn = self.credential_config.get('role_arn')
|
|
|
external_id = self.credential_config.get('external_id')
|
|
|
@@ -90,11 +91,16 @@ class AWSCredentialProvider:
|
|
|
|
|
|
try:
|
|
|
# Create base session with the centralized account credentials
|
|
|
- base_session = boto3.Session(
|
|
|
- aws_access_key_id=self.base_credentials.get('access_key_id'),
|
|
|
- aws_secret_access_key=self.base_credentials.get('secret_access_key'),
|
|
|
- region_name=region_name or 'us-east-1'
|
|
|
- )
|
|
|
+ base_session_params = {
|
|
|
+ 'aws_access_key_id': self.base_credentials.get('access_key_id'),
|
|
|
+ 'aws_secret_access_key': self.base_credentials.get('secret_access_key'),
|
|
|
+ 'region_name': region_name or 'us-east-1'
|
|
|
+ }
|
|
|
+ # Add session token if provided (for temporary credentials)
|
|
|
+ if self.base_credentials.get('session_token'):
|
|
|
+ base_session_params['aws_session_token'] = self.base_credentials.get('session_token')
|
|
|
+
|
|
|
+ base_session = boto3.Session(**base_session_params)
|
|
|
|
|
|
# Use STS to assume the role
|
|
|
sts_client = base_session.client('sts')
|
|
|
@@ -133,19 +139,26 @@ class AWSCredentialProvider:
|
|
|
def _get_access_key_session(self, region_name: Optional[str] = None) -> boto3.Session:
|
|
|
"""
|
|
|
Get a session using Access Key credentials.
|
|
|
+ Supports optional session token for temporary credentials.
|
|
|
"""
|
|
|
access_key_id = self.credential_config.get('access_key_id')
|
|
|
secret_access_key = self.credential_config.get('secret_access_key')
|
|
|
+ session_token = self.credential_config.get('session_token')
|
|
|
|
|
|
if not access_key_id or not secret_access_key:
|
|
|
raise CredentialError("Access Key ID and Secret Access Key are required")
|
|
|
|
|
|
try:
|
|
|
- return boto3.Session(
|
|
|
- aws_access_key_id=access_key_id,
|
|
|
- aws_secret_access_key=secret_access_key,
|
|
|
- region_name=region_name
|
|
|
- )
|
|
|
+ session_params = {
|
|
|
+ 'aws_access_key_id': access_key_id,
|
|
|
+ 'aws_secret_access_key': secret_access_key,
|
|
|
+ 'region_name': region_name
|
|
|
+ }
|
|
|
+ # Add session token if provided (for temporary credentials)
|
|
|
+ if session_token:
|
|
|
+ session_params['aws_session_token'] = session_token
|
|
|
+
|
|
|
+ return boto3.Session(**session_params)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to create session with access key: {str(e)}")
|
|
|
raise CredentialError(f"Failed to create session: {str(e)}")
|
|
|
@@ -222,6 +235,10 @@ def create_credential_provider_from_model(
|
|
|
'access_key_id': base_config.access_key_id,
|
|
|
'secret_access_key': base_config.get_secret_access_key()
|
|
|
}
|
|
|
+ # Add session token if available (for temporary credentials)
|
|
|
+ session_token = base_config.get_session_token()
|
|
|
+ if session_token:
|
|
|
+ base_credentials['session_token'] = session_token
|
|
|
else: # access_key
|
|
|
credential_config = {
|
|
|
'access_key_id': credential.access_key_id,
|