| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- from datetime import datetime, timezone
- import json
- from app import db
- def format_datetime(dt: datetime) -> str:
- """Format datetime to ISO format with UTC timezone indicator"""
- if dt is None:
- return None
- # If datetime is naive (no timezone), assume it's UTC
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.isoformat()
- class Task(db.Model):
- """Task model for scan tasks"""
- __tablename__ = 'tasks'
-
- id = db.Column(db.Integer, primary_key=True)
- name = db.Column(db.String(200), nullable=False)
- status = db.Column(
- db.Enum('pending', 'running', 'completed', 'failed', name='task_status'),
- default='pending',
- index=True
- )
- progress = db.Column(db.Integer, default=0)
- created_by = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
- created_at = db.Column(db.DateTime, default=datetime.utcnow, index=True)
- started_at = db.Column(db.DateTime)
- completed_at = db.Column(db.DateTime)
- celery_task_id = db.Column(db.String(100), index=True)
-
- # Task configuration (JSON)
- _credential_ids = db.Column('credential_ids', db.Text)
- _regions = db.Column('regions', db.Text)
- _project_metadata = db.Column('project_metadata', db.Text)
-
- # CloudShell Scanner fields
- source = db.Column(db.String(20), default='credential') # 'credential' 或 'upload'
- scan_data_path = db.Column(db.String(500), nullable=True) # 上传的 JSON 文件路径
-
- # Relationships
- created_by_user = db.relationship('User', back_populates='tasks')
- logs = db.relationship('TaskLog', back_populates='task', cascade='all, delete-orphan')
- report = db.relationship('Report', back_populates='task', uselist=False, cascade='all, delete-orphan')
-
- @property
- def credential_ids(self) -> list:
- """Get credential IDs as list"""
- if self._credential_ids:
- return json.loads(self._credential_ids)
- return []
-
- @credential_ids.setter
- def credential_ids(self, value: list):
- """Set credential IDs from list"""
- self._credential_ids = json.dumps(value)
-
- @property
- def regions(self) -> list:
- """Get regions as list"""
- if self._regions:
- return json.loads(self._regions)
- return []
-
- @regions.setter
- def regions(self, value: list):
- """Set regions from list"""
- self._regions = json.dumps(value)
-
- @property
- def project_metadata(self) -> dict:
- """Get project metadata as dict"""
- if self._project_metadata:
- return json.loads(self._project_metadata)
- return {}
-
- @project_metadata.setter
- def project_metadata(self, value: dict):
- """Set project metadata from dict"""
- self._project_metadata = json.dumps(value)
-
- def to_dict(self) -> dict:
- """Convert task to dictionary"""
- return {
- 'id': self.id,
- 'name': self.name,
- 'status': self.status,
- 'progress': self.progress,
- 'created_by': self.created_by,
- 'created_at': format_datetime(self.created_at),
- 'started_at': format_datetime(self.started_at),
- 'completed_at': format_datetime(self.completed_at),
- 'credential_ids': self.credential_ids,
- 'regions': self.regions,
- 'project_metadata': self.project_metadata,
- 'source': self.source,
- 'scan_data_path': self.scan_data_path
- }
-
- def __repr__(self):
- return f'<Task {self.id} {self.name} ({self.status})>'
- class TaskLog(db.Model):
- """Task log model for storing task execution logs"""
- __tablename__ = 'task_logs'
-
- id = db.Column(db.Integer, primary_key=True)
- task_id = db.Column(db.Integer, db.ForeignKey('tasks.id'), nullable=False, index=True)
- level = db.Column(db.Enum('info', 'warning', 'error', name='log_level'), default='info')
- message = db.Column(db.Text, nullable=False)
- details = db.Column(db.Text) # JSON for stack trace, etc.
- created_at = db.Column(db.DateTime, default=datetime.utcnow, index=True)
-
- # Relationships
- task = db.relationship('Task', back_populates='logs')
-
- def to_dict(self) -> dict:
- """Convert log to dictionary"""
- result = {
- 'id': self.id,
- 'task_id': self.task_id,
- 'level': self.level,
- 'message': self.message,
- 'created_at': format_datetime(self.created_at)
- }
- if self.details:
- result['details'] = json.loads(self.details)
- return result
-
- def __repr__(self):
- return f'<TaskLog {self.id} [{self.level}] {self.message[:50]}>'
|