""" Mock任务模块 - 用于没有Redis时的开发测试 当Redis不可用时,可以使用这个模块来模拟Celery任务 """ import time import threading from typing import Dict, Any, List import uuid class MockAsyncResult: """模拟Celery AsyncResult""" def __init__(self, task_id: str): self.id = task_id self.state = 'PENDING' self.info = {} def ready(self) -> bool: return self.state in ['SUCCESS', 'FAILURE'] def successful(self) -> bool: return self.state == 'SUCCESS' def failed(self) -> bool: return self.state == 'FAILURE' class MockCeleryTask: """模拟Celery任务""" def __init__(self, func): self.func = func self._results = {} def delay(self, *args, **kwargs): """模拟异步执行""" task_id = str(uuid.uuid4()) # 创建结果对象 result = MockAsyncResult(task_id) self._results[task_id] = result # 在后台线程中执行任务 def run_task(): try: result.state = 'PROGRESS' result.info = {'progress': 0} # 执行实际任务 task_result = self.func(*args, **kwargs) result.state = 'SUCCESS' result.info = task_result except Exception as e: result.state = 'FAILURE' result.info = {'error': str(e)} thread = threading.Thread(target=run_task) thread.daemon = True thread.start() return result def apply_async(self, args=None, kwargs=None): """模拟apply_async""" args = args or [] kwargs = kwargs or {} return self.delay(*args, **kwargs) def mock_scan_aws_resources(task_id: int, credential_ids: List[int], regions: List[str], project_metadata: Dict[str, Any]) -> Dict[str, Any]: """ 模拟AWS资源扫描任务 这个函数会模拟扫描过程,但不会实际调用AWS API """ from app import db from app.models import Task, TaskLog, Report import os print(f"🔄 Mock: 开始扫描任务 {task_id}") # 更新任务状态 task = db.session.get(Task, task_id) if not task: return {'status': 'error', 'message': 'Task not found'} try: task.status = 'running' task.progress = 0 db.session.commit() # 添加开始日志 log = TaskLog( task_id=task_id, level='info', message='Mock scan started', details='{"mode": "mock", "credentials": ' + str(len(credential_ids)) + ', "regions": ' + str(len(regions)) + '}' ) db.session.add(log) db.session.commit() # 模拟扫描过程 steps = [ (20, "初始化扫描环境"), (40, "扫描VPC资源"), (60, "扫描EC2实例"), (80, "扫描RDS数据库"), (100, "生成报告") ] for progress, message in steps: time.sleep(2) # 模拟扫描时间 task.progress = progress # 添加进度日志 log = TaskLog( task_id=task_id, level='info', message=f'Mock: {message}', details=f'{{"progress": {progress}}}' ) db.session.add(log) db.session.commit() print(f"🔄 Mock: {message} ({progress}%)") # 模拟生成报告 reports_folder = 'reports' os.makedirs(reports_folder, exist_ok=True) report_filename = f"mock-report-{task_id}-{int(time.time())}.docx" report_path = os.path.join(reports_folder, report_filename) # 创建一个简单的文本文件作为模拟报告 with open(report_path, 'w', encoding='utf-8') as f: f.write(f"""Mock AWS Resource Scan Report Task ID: {task_id} Project: {project_metadata.get('projectName', 'Unknown')} Client: {project_metadata.get('clientName', 'Unknown')} Regions: {', '.join(regions)} Credentials: {len(credential_ids)} accounts This is a mock report generated for testing purposes. No actual AWS resources were scanned. Generated at: {time.strftime('%Y-%m-%d %H:%M:%S')} """) # 创建报告记录 report = Report( task_id=task_id, file_name=report_filename, file_path=report_path, file_size=os.path.getsize(report_path) ) db.session.add(report) # 完成任务 task.status = 'completed' task.progress = 100 # 添加完成日志 log = TaskLog( task_id=task_id, level='info', message='Mock scan completed successfully', details=f'{{"report_path": "{report_path}", "file_size": {report.file_size}}}' ) db.session.add(log) db.session.commit() print(f"✅ Mock: 任务 {task_id} 完成") return { 'status': 'success', 'message': 'Mock scan completed', 'resources_found': 42, # 模拟找到的资源数量 'report_path': report_path } except Exception as e: # 处理错误 task.status = 'failed' # 添加错误日志 log = TaskLog( task_id=task_id, level='error', message=f'Mock scan failed: {str(e)}', details=f'{{"error_type": "{type(e).__name__}", "error_message": "{str(e)}"}}' ) db.session.add(log) db.session.commit() print(f"❌ Mock: 任务 {task_id} 失败: {e}") return { 'status': 'error', 'message': f'Mock scan failed: {str(e)}' } # 创建模拟任务 scan_aws_resources = MockCeleryTask(mock_scan_aws_resources)