report_service.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. """
  2. Report Service
  3. This module provides high-level report management functionality,
  4. integrating the ReportGenerator with database storage.
  5. """
  6. import os
  7. from datetime import datetime
  8. from typing import Dict, List, Any, Optional
  9. from flask import current_app
  10. from app import db
  11. from app.models import Report, Task
  12. from app.services.report_generator import ReportGenerator, generate_report_filename
  13. class ReportService:
  14. """
  15. Service for managing report generation and storage.
  16. This service:
  17. - Coordinates report generation from scan results
  18. - Stores report metadata in the database
  19. - Manages report file storage
  20. """
  21. @staticmethod
  22. def get_reports_folder() -> str:
  23. """Get the reports folder path from config (always returns absolute path)."""
  24. reports_folder = current_app.config.get('REPORTS_FOLDER', 'reports')
  25. if not os.path.isabs(reports_folder):
  26. # Convert to absolute path relative to the app root
  27. reports_folder = os.path.abspath(reports_folder)
  28. return reports_folder
  29. @staticmethod
  30. def get_uploads_folder() -> str:
  31. """Get the uploads folder path from config."""
  32. return current_app.config.get('UPLOAD_FOLDER', 'uploads')
  33. @classmethod
  34. def generate_and_store_report(cls, task_id: int, scan_results: Dict[str, List[Dict[str, Any]]],
  35. project_metadata: Dict[str, Any],
  36. network_diagram_path: str = None,
  37. regions: List[str] = None) -> Report:
  38. """
  39. Generate a report and store it in the database.
  40. Args:
  41. task_id: ID of the associated task
  42. scan_results: Dictionary mapping service keys to lists of resources
  43. project_metadata: Project metadata for the report
  44. network_diagram_path: Optional path to network diagram image
  45. regions: Optional list of regions being scanned (for multi-region heading display)
  46. Returns:
  47. Created Report model instance
  48. Raises:
  49. ValueError: If task doesn't exist or already has a report
  50. """
  51. # Verify task exists
  52. task = Task.query.get(task_id)
  53. if not task:
  54. raise ValueError(f"Task {task_id} not found")
  55. # Check if task already has a report
  56. existing_report = Report.query.filter_by(task_id=task_id).first()
  57. if existing_report:
  58. raise ValueError(f"Task {task_id} already has a report")
  59. # Generate filename and output path
  60. filename = generate_report_filename(project_metadata)
  61. reports_folder = cls.get_reports_folder()
  62. os.makedirs(reports_folder, exist_ok=True)
  63. output_path = os.path.join(reports_folder, filename)
  64. # Generate the report
  65. generator = ReportGenerator()
  66. result = generator.generate_report(
  67. scan_results=scan_results,
  68. project_metadata=project_metadata,
  69. output_path=output_path,
  70. network_diagram_path=network_diagram_path,
  71. regions=regions
  72. )
  73. # Create database record
  74. report = Report(
  75. task_id=task_id,
  76. file_name=result['file_name'],
  77. file_path=result['file_path'],
  78. file_size=result['file_size']
  79. )
  80. db.session.add(report)
  81. db.session.commit()
  82. return report
  83. @classmethod
  84. def get_report_by_id(cls, report_id: int) -> Optional[Report]:
  85. """
  86. Get a report by ID.
  87. Args:
  88. report_id: ID of the report
  89. Returns:
  90. Report instance or None
  91. """
  92. return Report.query.get(report_id)
  93. @classmethod
  94. def get_report_by_task_id(cls, task_id: int) -> Optional[Report]:
  95. """
  96. Get a report by task ID.
  97. Args:
  98. task_id: ID of the associated task
  99. Returns:
  100. Report instance or None
  101. """
  102. return Report.query.filter_by(task_id=task_id).first()
  103. @classmethod
  104. def get_reports(cls, page: int = 1, page_size: int = 20,
  105. task_id: int = None, user_id: int = None) -> Dict[str, Any]:
  106. """
  107. Get paginated list of reports.
  108. Args:
  109. page: Page number (1-indexed)
  110. page_size: Number of items per page
  111. task_id: Optional filter by task ID
  112. user_id: Optional filter by user ID (reports from user's tasks)
  113. Returns:
  114. Dictionary with 'data' and 'pagination' keys
  115. """
  116. query = Report.query
  117. if task_id:
  118. query = query.filter_by(task_id=task_id)
  119. if user_id:
  120. query = query.join(Task).filter(Task.created_by == user_id)
  121. query = query.order_by(Report.created_at.desc())
  122. # Paginate
  123. total = query.count()
  124. reports = query.offset((page - 1) * page_size).limit(page_size).all()
  125. return {
  126. 'data': [r.to_dict() for r in reports],
  127. 'pagination': {
  128. 'page': page,
  129. 'page_size': page_size,
  130. 'total': total,
  131. 'total_pages': (total + page_size - 1) // page_size
  132. }
  133. }
  134. @classmethod
  135. def delete_report(cls, report_id: int) -> bool:
  136. """
  137. Delete a report and its file.
  138. Args:
  139. report_id: ID of the report to delete
  140. Returns:
  141. True if deleted, False if not found
  142. """
  143. report = Report.query.get(report_id)
  144. if not report:
  145. return False
  146. # Delete the file
  147. if report.file_path and os.path.exists(report.file_path):
  148. try:
  149. os.remove(report.file_path)
  150. except OSError:
  151. pass # File may already be deleted
  152. # Delete database record
  153. db.session.delete(report)
  154. db.session.commit()
  155. return True
  156. @classmethod
  157. def get_report_file_path(cls, report_id: int) -> Optional[str]:
  158. """
  159. Get the file path for a report.
  160. Args:
  161. report_id: ID of the report
  162. Returns:
  163. File path or None if report not found or file doesn't exist
  164. """
  165. report = Report.query.get(report_id)
  166. if not report or not report.file_path:
  167. return None
  168. file_path = report.file_path
  169. # If the stored path exists, return it
  170. if os.path.exists(file_path):
  171. return file_path
  172. # If stored path is relative, try to resolve it
  173. if not os.path.isabs(file_path):
  174. # Try relative to current working directory
  175. abs_path = os.path.abspath(file_path)
  176. if os.path.exists(abs_path):
  177. return abs_path
  178. # Try to find the file in the reports folder by filename
  179. reports_folder = cls.get_reports_folder()
  180. filename = os.path.basename(file_path)
  181. fallback_path = os.path.join(reports_folder, filename)
  182. if os.path.exists(fallback_path):
  183. return fallback_path
  184. return None
  185. @classmethod
  186. def regenerate_report(cls, task_id: int, scan_results: Dict[str, List[Dict[str, Any]]],
  187. project_metadata: Dict[str, Any],
  188. network_diagram_path: str = None,
  189. regions: List[str] = None) -> Report:
  190. """
  191. Regenerate a report for a task, replacing the existing one.
  192. Args:
  193. task_id: ID of the associated task
  194. scan_results: Dictionary mapping service keys to lists of resources
  195. project_metadata: Project metadata for the report
  196. network_diagram_path: Optional path to network diagram image
  197. regions: Optional list of regions being scanned (for multi-region heading display)
  198. Returns:
  199. Updated Report model instance
  200. """
  201. # Delete existing report if any
  202. existing_report = Report.query.filter_by(task_id=task_id).first()
  203. if existing_report:
  204. cls.delete_report(existing_report.id)
  205. # Generate new report
  206. return cls.generate_and_store_report(
  207. task_id=task_id,
  208. scan_results=scan_results,
  209. project_metadata=project_metadata,
  210. network_diagram_path=network_diagram_path,
  211. regions=regions
  212. )