| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- """
- Mock任务模块 - 用于没有Redis时的开发测试
- 当Redis不可用时,可以使用这个模块来模拟Celery任务
- """
- import time
- import threading
- from typing import Dict, Any, List
- import uuid
- class MockAsyncResult:
- """模拟Celery AsyncResult"""
-
- def __init__(self, task_id: str):
- self.id = task_id
- self.state = 'PENDING'
- self.info = {}
-
- def ready(self) -> bool:
- return self.state in ['SUCCESS', 'FAILURE']
-
- def successful(self) -> bool:
- return self.state == 'SUCCESS'
-
- def failed(self) -> bool:
- return self.state == 'FAILURE'
- class MockCeleryTask:
- """模拟Celery任务"""
-
- def __init__(self, func):
- self.func = func
- self._results = {}
-
- def delay(self, *args, **kwargs):
- """模拟异步执行"""
- task_id = str(uuid.uuid4())
-
- # 创建结果对象
- result = MockAsyncResult(task_id)
- self._results[task_id] = result
-
- # 在后台线程中执行任务
- def run_task():
- try:
- result.state = 'PROGRESS'
- result.info = {'progress': 0}
-
- # 执行实际任务
- task_result = self.func(*args, **kwargs)
-
- result.state = 'SUCCESS'
- result.info = task_result
-
- except Exception as e:
- result.state = 'FAILURE'
- result.info = {'error': str(e)}
-
- thread = threading.Thread(target=run_task)
- thread.daemon = True
- thread.start()
-
- return result
-
- def apply_async(self, args=None, kwargs=None):
- """模拟apply_async"""
- args = args or []
- kwargs = kwargs or {}
- return self.delay(*args, **kwargs)
- def mock_scan_aws_resources(task_id: int, credential_ids: List[int],
- regions: List[str], project_metadata: Dict[str, Any]) -> Dict[str, Any]:
- """
- 模拟AWS资源扫描任务
-
- 这个函数会模拟扫描过程,但不会实际调用AWS API
- """
- from app import db
- from app.models import Task, TaskLog, Report
- import os
-
- print(f"🔄 Mock: 开始扫描任务 {task_id}")
-
- # 更新任务状态
- task = db.session.get(Task, task_id)
- if not task:
- return {'status': 'error', 'message': 'Task not found'}
-
- try:
- task.status = 'running'
- task.progress = 0
- db.session.commit()
-
- # 添加开始日志
- log = TaskLog(
- task_id=task_id,
- level='info',
- message='Mock scan started',
- details='{"mode": "mock", "credentials": ' + str(len(credential_ids)) + ', "regions": ' + str(len(regions)) + '}'
- )
- db.session.add(log)
- db.session.commit()
-
- # 模拟扫描过程
- steps = [
- (20, "初始化扫描环境"),
- (40, "扫描VPC资源"),
- (60, "扫描EC2实例"),
- (80, "扫描RDS数据库"),
- (100, "生成报告")
- ]
-
- for progress, message in steps:
- time.sleep(2) # 模拟扫描时间
- task.progress = progress
-
- # 添加进度日志
- log = TaskLog(
- task_id=task_id,
- level='info',
- message=f'Mock: {message}',
- details=f'{{"progress": {progress}}}'
- )
- db.session.add(log)
- db.session.commit()
-
- print(f"🔄 Mock: {message} ({progress}%)")
-
- # 模拟生成报告
- reports_folder = 'reports'
- os.makedirs(reports_folder, exist_ok=True)
-
- report_filename = f"mock-report-{task_id}-{int(time.time())}.docx"
- report_path = os.path.join(reports_folder, report_filename)
-
- # 创建一个简单的文本文件作为模拟报告
- with open(report_path, 'w', encoding='utf-8') as f:
- f.write(f"""Mock AWS Resource Scan Report
-
- Task ID: {task_id}
- Project: {project_metadata.get('projectName', 'Unknown')}
- Client: {project_metadata.get('clientName', 'Unknown')}
- Regions: {', '.join(regions)}
- Credentials: {len(credential_ids)} accounts
- This is a mock report generated for testing purposes.
- No actual AWS resources were scanned.
- Generated at: {time.strftime('%Y-%m-%d %H:%M:%S')}
- """)
-
- # 创建报告记录
- report = Report(
- task_id=task_id,
- file_name=report_filename,
- file_path=report_path,
- file_size=os.path.getsize(report_path)
- )
- db.session.add(report)
-
- # 完成任务
- task.status = 'completed'
- task.progress = 100
-
- # 添加完成日志
- log = TaskLog(
- task_id=task_id,
- level='info',
- message='Mock scan completed successfully',
- details=f'{{"report_path": "{report_path}", "file_size": {report.file_size}}}'
- )
- db.session.add(log)
- db.session.commit()
-
- print(f"✅ Mock: 任务 {task_id} 完成")
-
- return {
- 'status': 'success',
- 'message': 'Mock scan completed',
- 'resources_found': 42, # 模拟找到的资源数量
- 'report_path': report_path
- }
-
- except Exception as e:
- # 处理错误
- task.status = 'failed'
-
- # 添加错误日志
- log = TaskLog(
- task_id=task_id,
- level='error',
- message=f'Mock scan failed: {str(e)}',
- details=f'{{"error_type": "{type(e).__name__}", "error_message": "{str(e)}"}}'
- )
- db.session.add(log)
- db.session.commit()
-
- print(f"❌ Mock: 任务 {task_id} 失败: {e}")
-
- return {
- 'status': 'error',
- 'message': f'Mock scan failed: {str(e)}'
- }
- # 创建模拟任务
- scan_aws_resources = MockCeleryTask(mock_scan_aws_resources)
|