credential.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. from datetime import datetime, timezone
  2. from app import db
  3. from app.utils.encryption import encrypt_value, decrypt_value
  4. def format_datetime(dt: datetime) -> str:
  5. """Format datetime to ISO format with UTC timezone indicator"""
  6. if dt is None:
  7. return None
  8. # If datetime is naive (no timezone), assume it's UTC
  9. if dt.tzinfo is None:
  10. dt = dt.replace(tzinfo=timezone.utc)
  11. return dt.isoformat()
  12. class AWSCredential(db.Model):
  13. """AWS Credential model for storing IAM Role or Access Key credentials"""
  14. __tablename__ = 'aws_credentials'
  15. id = db.Column(db.Integer, primary_key=True)
  16. name = db.Column(db.String(100), nullable=False)
  17. credential_type = db.Column(db.Enum('assume_role', 'access_key', name='credential_type'), nullable=False)
  18. account_id = db.Column(db.String(12), nullable=True, index=True) # Make nullable for access_key type
  19. # For assume_role
  20. role_arn = db.Column(db.String(255))
  21. external_id = db.Column(db.String(255))
  22. # For access_key (encrypted)
  23. access_key_id = db.Column(db.String(255))
  24. secret_access_key_encrypted = db.Column(db.Text)
  25. created_at = db.Column(db.DateTime, default=datetime.utcnow)
  26. is_active = db.Column(db.Boolean, default=True)
  27. # Relationships
  28. users = db.relationship('UserCredential', back_populates='credential', cascade='all, delete-orphan')
  29. def set_secret_access_key(self, secret_key: str) -> None:
  30. """Encrypt and store the secret access key"""
  31. self.secret_access_key_encrypted = encrypt_value(secret_key)
  32. def get_secret_access_key(self) -> str:
  33. """Decrypt and return the secret access key"""
  34. if self.secret_access_key_encrypted:
  35. return decrypt_value(self.secret_access_key_encrypted)
  36. return None
  37. def to_dict(self, mask_sensitive: bool = True) -> dict:
  38. """Convert credential to dictionary"""
  39. result = {
  40. 'id': self.id,
  41. 'name': self.name,
  42. 'credential_type': self.credential_type,
  43. 'account_id': self.account_id,
  44. 'role_arn': self.role_arn,
  45. 'external_id': self.external_id,
  46. 'access_key_id': self.access_key_id,
  47. 'created_at': format_datetime(self.created_at),
  48. 'is_active': self.is_active
  49. }
  50. # Mask sensitive data
  51. if mask_sensitive and self.access_key_id:
  52. result['access_key_id'] = self.access_key_id[:4] + '****' + self.access_key_id[-4:] if len(self.access_key_id) > 8 else '****'
  53. return result
  54. def __repr__(self):
  55. return f'<AWSCredential {self.name} ({self.account_id})>'
  56. class UserCredential(db.Model):
  57. """Association table for User-Credential many-to-many relationship"""
  58. __tablename__ = 'user_credentials'
  59. id = db.Column(db.Integer, primary_key=True)
  60. user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
  61. credential_id = db.Column(db.Integer, db.ForeignKey('aws_credentials.id'), nullable=False)
  62. assigned_at = db.Column(db.DateTime, default=datetime.utcnow)
  63. # Relationships
  64. user = db.relationship('User', back_populates='credentials')
  65. credential = db.relationship('AWSCredential', back_populates='users')
  66. # Unique constraint to prevent duplicate assignments
  67. __table_args__ = (
  68. db.UniqueConstraint('user_id', 'credential_id', name='unique_user_credential'),
  69. )
  70. def __repr__(self):
  71. return f'<UserCredential user={self.user_id} credential={self.credential_id}>'
  72. class BaseAssumeRoleConfig(db.Model):
  73. """Configuration for the base account used for Assume Role"""
  74. __tablename__ = 'base_assume_role_config'
  75. id = db.Column(db.Integer, primary_key=True)
  76. access_key_id = db.Column(db.String(255), nullable=False)
  77. secret_access_key_encrypted = db.Column(db.Text, nullable=False)
  78. updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
  79. def set_secret_access_key(self, secret_key: str) -> None:
  80. """Encrypt and store the secret access key"""
  81. self.secret_access_key_encrypted = encrypt_value(secret_key)
  82. def get_secret_access_key(self) -> str:
  83. """Decrypt and return the secret access key"""
  84. if self.secret_access_key_encrypted:
  85. return decrypt_value(self.secret_access_key_encrypted)
  86. return None
  87. def to_dict(self, mask_sensitive: bool = True) -> dict:
  88. """Convert config to dictionary"""
  89. result = {
  90. 'id': self.id,
  91. 'access_key_id': self.access_key_id,
  92. 'updated_at': format_datetime(self.updated_at)
  93. }
  94. if mask_sensitive:
  95. result['access_key_id'] = self.access_key_id[:4] + '****' + self.access_key_id[-4:] if len(self.access_key_id) > 8 else '****'
  96. return result
  97. def __repr__(self):
  98. return f'<BaseAssumeRoleConfig id={self.id}>'