mock_tasks.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. """
  2. Mock任务模块 - 用于没有Redis时的开发测试
  3. 当Redis不可用时,可以使用这个模块来模拟Celery任务
  4. """
  5. import time
  6. import threading
  7. from typing import Dict, Any, List
  8. import uuid
  9. class MockAsyncResult:
  10. """模拟Celery AsyncResult"""
  11. def __init__(self, task_id: str):
  12. self.id = task_id
  13. self.state = 'PENDING'
  14. self.info = {}
  15. def ready(self) -> bool:
  16. return self.state in ['SUCCESS', 'FAILURE']
  17. def successful(self) -> bool:
  18. return self.state == 'SUCCESS'
  19. def failed(self) -> bool:
  20. return self.state == 'FAILURE'
  21. class MockCeleryTask:
  22. """模拟Celery任务"""
  23. def __init__(self, func):
  24. self.func = func
  25. self._results = {}
  26. def delay(self, *args, **kwargs):
  27. """模拟异步执行"""
  28. task_id = str(uuid.uuid4())
  29. # 创建结果对象
  30. result = MockAsyncResult(task_id)
  31. self._results[task_id] = result
  32. # 在后台线程中执行任务
  33. def run_task():
  34. try:
  35. result.state = 'PROGRESS'
  36. result.info = {'progress': 0}
  37. # 执行实际任务
  38. task_result = self.func(*args, **kwargs)
  39. result.state = 'SUCCESS'
  40. result.info = task_result
  41. except Exception as e:
  42. result.state = 'FAILURE'
  43. result.info = {'error': str(e)}
  44. thread = threading.Thread(target=run_task)
  45. thread.daemon = True
  46. thread.start()
  47. return result
  48. def apply_async(self, args=None, kwargs=None):
  49. """模拟apply_async"""
  50. args = args or []
  51. kwargs = kwargs or {}
  52. return self.delay(*args, **kwargs)
  53. def mock_scan_aws_resources(task_id: int, credential_ids: List[int],
  54. regions: List[str], project_metadata: Dict[str, Any]) -> Dict[str, Any]:
  55. """
  56. 模拟AWS资源扫描任务
  57. 这个函数会模拟扫描过程,但不会实际调用AWS API
  58. """
  59. from app import db
  60. from app.models import Task, TaskLog, Report
  61. import os
  62. print(f"🔄 Mock: 开始扫描任务 {task_id}")
  63. # 更新任务状态
  64. task = db.session.get(Task, task_id)
  65. if not task:
  66. return {'status': 'error', 'message': 'Task not found'}
  67. try:
  68. task.status = 'running'
  69. task.progress = 0
  70. db.session.commit()
  71. # 添加开始日志
  72. log = TaskLog(
  73. task_id=task_id,
  74. level='info',
  75. message='Mock scan started',
  76. details='{"mode": "mock", "credentials": ' + str(len(credential_ids)) + ', "regions": ' + str(len(regions)) + '}'
  77. )
  78. db.session.add(log)
  79. db.session.commit()
  80. # 模拟扫描过程
  81. steps = [
  82. (20, "初始化扫描环境"),
  83. (40, "扫描VPC资源"),
  84. (60, "扫描EC2实例"),
  85. (80, "扫描RDS数据库"),
  86. (100, "生成报告")
  87. ]
  88. for progress, message in steps:
  89. time.sleep(2) # 模拟扫描时间
  90. task.progress = progress
  91. # 添加进度日志
  92. log = TaskLog(
  93. task_id=task_id,
  94. level='info',
  95. message=f'Mock: {message}',
  96. details=f'{{"progress": {progress}}}'
  97. )
  98. db.session.add(log)
  99. db.session.commit()
  100. print(f"🔄 Mock: {message} ({progress}%)")
  101. # 模拟生成报告
  102. reports_folder = 'reports'
  103. os.makedirs(reports_folder, exist_ok=True)
  104. report_filename = f"mock-report-{task_id}-{int(time.time())}.docx"
  105. report_path = os.path.join(reports_folder, report_filename)
  106. # 创建一个简单的文本文件作为模拟报告
  107. with open(report_path, 'w', encoding='utf-8') as f:
  108. f.write(f"""Mock AWS Resource Scan Report
  109. Task ID: {task_id}
  110. Project: {project_metadata.get('projectName', 'Unknown')}
  111. Client: {project_metadata.get('clientName', 'Unknown')}
  112. Regions: {', '.join(regions)}
  113. Credentials: {len(credential_ids)} accounts
  114. This is a mock report generated for testing purposes.
  115. No actual AWS resources were scanned.
  116. Generated at: {time.strftime('%Y-%m-%d %H:%M:%S')}
  117. """)
  118. # 创建报告记录
  119. report = Report(
  120. task_id=task_id,
  121. file_name=report_filename,
  122. file_path=report_path,
  123. file_size=os.path.getsize(report_path)
  124. )
  125. db.session.add(report)
  126. # 完成任务
  127. task.status = 'completed'
  128. task.progress = 100
  129. # 添加完成日志
  130. log = TaskLog(
  131. task_id=task_id,
  132. level='info',
  133. message='Mock scan completed successfully',
  134. details=f'{{"report_path": "{report_path}", "file_size": {report.file_size}}}'
  135. )
  136. db.session.add(log)
  137. db.session.commit()
  138. print(f"✅ Mock: 任务 {task_id} 完成")
  139. return {
  140. 'status': 'success',
  141. 'message': 'Mock scan completed',
  142. 'resources_found': 42, # 模拟找到的资源数量
  143. 'report_path': report_path
  144. }
  145. except Exception as e:
  146. # 处理错误
  147. task.status = 'failed'
  148. # 添加错误日志
  149. log = TaskLog(
  150. task_id=task_id,
  151. level='error',
  152. message=f'Mock scan failed: {str(e)}',
  153. details=f'{{"error_type": "{type(e).__name__}", "error_message": "{str(e)}"}}'
  154. )
  155. db.session.add(log)
  156. db.session.commit()
  157. print(f"❌ Mock: 任务 {task_id} 失败: {e}")
  158. return {
  159. 'status': 'error',
  160. 'message': f'Mock scan failed: {str(e)}'
  161. }
  162. # 创建模拟任务
  163. scan_aws_resources = MockCeleryTask(mock_scan_aws_resources)