| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- from datetime import datetime, timezone
- from app import db
- from app.utils.encryption import encrypt_value, decrypt_value
- def format_datetime(dt: datetime) -> str:
- """Format datetime to ISO format with UTC timezone indicator"""
- if dt is None:
- return None
- # If datetime is naive (no timezone), assume it's UTC
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.isoformat()
- class AWSCredential(db.Model):
- """AWS Credential model for storing IAM Role or Access Key credentials"""
- __tablename__ = 'aws_credentials'
-
- id = db.Column(db.Integer, primary_key=True)
- name = db.Column(db.String(100), nullable=False)
- credential_type = db.Column(db.Enum('assume_role', 'access_key', name='credential_type'), nullable=False)
- account_id = db.Column(db.String(12), nullable=True, index=True) # Make nullable for access_key type
-
- # For assume_role
- role_arn = db.Column(db.String(255))
- external_id = db.Column(db.String(255))
-
- # For access_key (encrypted)
- access_key_id = db.Column(db.String(255))
- secret_access_key_encrypted = db.Column(db.Text)
-
- created_at = db.Column(db.DateTime, default=datetime.utcnow)
- is_active = db.Column(db.Boolean, default=True)
-
- # Relationships
- users = db.relationship('UserCredential', back_populates='credential', cascade='all, delete-orphan')
-
- def set_secret_access_key(self, secret_key: str) -> None:
- """Encrypt and store the secret access key"""
- self.secret_access_key_encrypted = encrypt_value(secret_key)
-
- def get_secret_access_key(self) -> str:
- """Decrypt and return the secret access key"""
- if self.secret_access_key_encrypted:
- return decrypt_value(self.secret_access_key_encrypted)
- return None
-
- def to_dict(self, mask_sensitive: bool = True) -> dict:
- """Convert credential to dictionary"""
- result = {
- 'id': self.id,
- 'name': self.name,
- 'credential_type': self.credential_type,
- 'account_id': self.account_id,
- 'role_arn': self.role_arn,
- 'external_id': self.external_id,
- 'access_key_id': self.access_key_id,
- 'created_at': format_datetime(self.created_at),
- 'is_active': self.is_active
- }
-
- # Mask sensitive data
- if mask_sensitive and self.access_key_id:
- result['access_key_id'] = self.access_key_id[:4] + '****' + self.access_key_id[-4:] if len(self.access_key_id) > 8 else '****'
-
- return result
-
- def __repr__(self):
- return f'<AWSCredential {self.name} ({self.account_id})>'
- class UserCredential(db.Model):
- """Association table for User-Credential many-to-many relationship"""
- __tablename__ = 'user_credentials'
-
- id = db.Column(db.Integer, primary_key=True)
- user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
- credential_id = db.Column(db.Integer, db.ForeignKey('aws_credentials.id'), nullable=False)
- assigned_at = db.Column(db.DateTime, default=datetime.utcnow)
-
- # Relationships
- user = db.relationship('User', back_populates='credentials')
- credential = db.relationship('AWSCredential', back_populates='users')
-
- # Unique constraint to prevent duplicate assignments
- __table_args__ = (
- db.UniqueConstraint('user_id', 'credential_id', name='unique_user_credential'),
- )
-
- def __repr__(self):
- return f'<UserCredential user={self.user_id} credential={self.credential_id}>'
- class BaseAssumeRoleConfig(db.Model):
- """Configuration for the base account used for Assume Role"""
- __tablename__ = 'base_assume_role_config'
-
- id = db.Column(db.Integer, primary_key=True)
- access_key_id = db.Column(db.String(255), nullable=False)
- secret_access_key_encrypted = db.Column(db.Text, nullable=False)
- updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
-
- def set_secret_access_key(self, secret_key: str) -> None:
- """Encrypt and store the secret access key"""
- self.secret_access_key_encrypted = encrypt_value(secret_key)
-
- def get_secret_access_key(self) -> str:
- """Decrypt and return the secret access key"""
- if self.secret_access_key_encrypted:
- return decrypt_value(self.secret_access_key_encrypted)
- return None
-
- def to_dict(self, mask_sensitive: bool = True) -> dict:
- """Convert config to dictionary"""
- result = {
- 'id': self.id,
- 'access_key_id': self.access_key_id,
- 'updated_at': format_datetime(self.updated_at)
- }
-
- if mask_sensitive:
- result['access_key_id'] = self.access_key_id[:4] + '****' + self.access_key_id[-4:] if len(self.access_key_id) > 8 else '****'
-
- return result
-
- def __repr__(self):
- return f'<BaseAssumeRoleConfig id={self.id}>'
|