from datetime import datetime, timezone import json from app import db def format_datetime(dt: datetime) -> str: """Format datetime to ISO format with UTC timezone indicator""" if dt is None: return None # If datetime is naive (no timezone), assume it's UTC if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.isoformat() class Task(db.Model): """Task model for scan tasks""" __tablename__ = 'tasks' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(200), nullable=False) status = db.Column( db.Enum('pending', 'running', 'completed', 'failed', name='task_status'), default='pending', index=True ) progress = db.Column(db.Integer, default=0) created_by = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow, index=True) started_at = db.Column(db.DateTime) completed_at = db.Column(db.DateTime) celery_task_id = db.Column(db.String(100), index=True) # Task configuration (JSON) _credential_ids = db.Column('credential_ids', db.Text) _regions = db.Column('regions', db.Text) _project_metadata = db.Column('project_metadata', db.Text) # CloudShell Scanner fields source = db.Column(db.String(20), default='credential') # 'credential' 或 'upload' scan_data_path = db.Column(db.String(500), nullable=True) # 上传的 JSON 文件路径 # Relationships created_by_user = db.relationship('User', back_populates='tasks') logs = db.relationship('TaskLog', back_populates='task', cascade='all, delete-orphan') report = db.relationship('Report', back_populates='task', uselist=False, cascade='all, delete-orphan') @property def credential_ids(self) -> list: """Get credential IDs as list""" if self._credential_ids: return json.loads(self._credential_ids) return [] @credential_ids.setter def credential_ids(self, value: list): """Set credential IDs from list""" self._credential_ids = json.dumps(value) @property def regions(self) -> list: """Get regions as list""" if self._regions: return json.loads(self._regions) return [] @regions.setter def regions(self, value: list): """Set regions from list""" self._regions = json.dumps(value) @property def project_metadata(self) -> dict: """Get project metadata as dict""" if self._project_metadata: return json.loads(self._project_metadata) return {} @project_metadata.setter def project_metadata(self, value: dict): """Set project metadata from dict""" self._project_metadata = json.dumps(value) def to_dict(self) -> dict: """Convert task to dictionary""" return { 'id': self.id, 'name': self.name, 'status': self.status, 'progress': self.progress, 'created_by': self.created_by, 'created_at': format_datetime(self.created_at), 'started_at': format_datetime(self.started_at), 'completed_at': format_datetime(self.completed_at), 'credential_ids': self.credential_ids, 'regions': self.regions, 'project_metadata': self.project_metadata, 'source': self.source, 'scan_data_path': self.scan_data_path } def __repr__(self): return f'' class TaskLog(db.Model): """Task log model for storing task execution logs""" __tablename__ = 'task_logs' id = db.Column(db.Integer, primary_key=True) task_id = db.Column(db.Integer, db.ForeignKey('tasks.id'), nullable=False, index=True) level = db.Column(db.Enum('info', 'warning', 'error', name='log_level'), default='info') message = db.Column(db.Text, nullable=False) details = db.Column(db.Text) # JSON for stack trace, etc. created_at = db.Column(db.DateTime, default=datetime.utcnow, index=True) # Relationships task = db.relationship('Task', back_populates='logs') def to_dict(self) -> dict: """Convert log to dictionary""" result = { 'id': self.id, 'task_id': self.task_id, 'level': self.level, 'message': self.message, 'created_at': format_datetime(self.created_at) } if self.details: result['details'] = json.loads(self.details) return result def __repr__(self): return f''