credential.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. from datetime import datetime
  2. from app import db
  3. from app.utils.encryption import encrypt_value, decrypt_value
  4. class AWSCredential(db.Model):
  5. """AWS Credential model for storing IAM Role or Access Key credentials"""
  6. __tablename__ = 'aws_credentials'
  7. id = db.Column(db.Integer, primary_key=True)
  8. name = db.Column(db.String(100), nullable=False)
  9. credential_type = db.Column(db.Enum('assume_role', 'access_key', name='credential_type'), nullable=False)
  10. account_id = db.Column(db.String(12), nullable=True, index=True) # Make nullable for access_key type
  11. # For assume_role
  12. role_arn = db.Column(db.String(255))
  13. external_id = db.Column(db.String(255))
  14. # For access_key (encrypted)
  15. access_key_id = db.Column(db.String(255))
  16. secret_access_key_encrypted = db.Column(db.Text)
  17. created_at = db.Column(db.DateTime, default=datetime.utcnow)
  18. is_active = db.Column(db.Boolean, default=True)
  19. # Relationships
  20. users = db.relationship('UserCredential', back_populates='credential', cascade='all, delete-orphan')
  21. def set_secret_access_key(self, secret_key: str) -> None:
  22. """Encrypt and store the secret access key"""
  23. self.secret_access_key_encrypted = encrypt_value(secret_key)
  24. def get_secret_access_key(self) -> str:
  25. """Decrypt and return the secret access key"""
  26. if self.secret_access_key_encrypted:
  27. return decrypt_value(self.secret_access_key_encrypted)
  28. return None
  29. def to_dict(self, mask_sensitive: bool = True) -> dict:
  30. """Convert credential to dictionary"""
  31. result = {
  32. 'id': self.id,
  33. 'name': self.name,
  34. 'credential_type': self.credential_type,
  35. 'account_id': self.account_id,
  36. 'role_arn': self.role_arn,
  37. 'external_id': self.external_id,
  38. 'access_key_id': self.access_key_id,
  39. 'created_at': self.created_at.isoformat() if self.created_at else None,
  40. 'is_active': self.is_active
  41. }
  42. # Mask sensitive data
  43. if mask_sensitive and self.access_key_id:
  44. result['access_key_id'] = self.access_key_id[:4] + '****' + self.access_key_id[-4:] if len(self.access_key_id) > 8 else '****'
  45. return result
  46. def __repr__(self):
  47. return f'<AWSCredential {self.name} ({self.account_id})>'
  48. class UserCredential(db.Model):
  49. """Association table for User-Credential many-to-many relationship"""
  50. __tablename__ = 'user_credentials'
  51. id = db.Column(db.Integer, primary_key=True)
  52. user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
  53. credential_id = db.Column(db.Integer, db.ForeignKey('aws_credentials.id'), nullable=False)
  54. assigned_at = db.Column(db.DateTime, default=datetime.utcnow)
  55. # Relationships
  56. user = db.relationship('User', back_populates='credentials')
  57. credential = db.relationship('AWSCredential', back_populates='users')
  58. # Unique constraint to prevent duplicate assignments
  59. __table_args__ = (
  60. db.UniqueConstraint('user_id', 'credential_id', name='unique_user_credential'),
  61. )
  62. def __repr__(self):
  63. return f'<UserCredential user={self.user_id} credential={self.credential_id}>'
  64. class BaseAssumeRoleConfig(db.Model):
  65. """Configuration for the base account used for Assume Role"""
  66. __tablename__ = 'base_assume_role_config'
  67. id = db.Column(db.Integer, primary_key=True)
  68. access_key_id = db.Column(db.String(255), nullable=False)
  69. secret_access_key_encrypted = db.Column(db.Text, nullable=False)
  70. updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
  71. def set_secret_access_key(self, secret_key: str) -> None:
  72. """Encrypt and store the secret access key"""
  73. self.secret_access_key_encrypted = encrypt_value(secret_key)
  74. def get_secret_access_key(self) -> str:
  75. """Decrypt and return the secret access key"""
  76. if self.secret_access_key_encrypted:
  77. return decrypt_value(self.secret_access_key_encrypted)
  78. return None
  79. def to_dict(self, mask_sensitive: bool = True) -> dict:
  80. """Convert config to dictionary"""
  81. result = {
  82. 'id': self.id,
  83. 'access_key_id': self.access_key_id,
  84. 'updated_at': self.updated_at.isoformat() if self.updated_at else None
  85. }
  86. if mask_sensitive:
  87. result['access_key_id'] = self.access_key_id[:4] + '****' + self.access_key_id[-4:] if len(self.access_key_id) > 8 else '****'
  88. return result
  89. def __repr__(self):
  90. return f'<BaseAssumeRoleConfig id={self.id}>'