""" Report Service This module provides high-level report management functionality, integrating the ReportGenerator with database storage. """ import os from datetime import datetime from typing import Dict, List, Any, Optional from flask import current_app from app import db from app.models import Report, Task from app.services.report_generator import ReportGenerator, generate_report_filename class ReportService: """ Service for managing report generation and storage. This service: - Coordinates report generation from scan results - Stores report metadata in the database - Manages report file storage """ @staticmethod def get_reports_folder() -> str: """Get the reports folder path from config (always returns absolute path).""" reports_folder = current_app.config.get('REPORTS_FOLDER', 'reports') if not os.path.isabs(reports_folder): # Convert to absolute path relative to the app root reports_folder = os.path.abspath(reports_folder) return reports_folder @staticmethod def get_uploads_folder() -> str: """Get the uploads folder path from config.""" return current_app.config.get('UPLOAD_FOLDER', 'uploads') @classmethod def generate_and_store_report(cls, task_id: int, scan_results: Dict[str, List[Dict[str, Any]]], project_metadata: Dict[str, Any], network_diagram_path: str = None, regions: List[str] = None) -> Report: """ Generate a report and store it in the database. Args: task_id: ID of the associated task scan_results: Dictionary mapping service keys to lists of resources project_metadata: Project metadata for the report network_diagram_path: Optional path to network diagram image regions: Optional list of regions being scanned (for multi-region heading display) Returns: Created Report model instance Raises: ValueError: If task doesn't exist or already has a report """ # Verify task exists task = Task.query.get(task_id) if not task: raise ValueError(f"Task {task_id} not found") # Check if task already has a report existing_report = Report.query.filter_by(task_id=task_id).first() if existing_report: raise ValueError(f"Task {task_id} already has a report") # Generate filename and output path filename = generate_report_filename(project_metadata) reports_folder = cls.get_reports_folder() os.makedirs(reports_folder, exist_ok=True) output_path = os.path.join(reports_folder, filename) # Generate the report generator = ReportGenerator() result = generator.generate_report( scan_results=scan_results, project_metadata=project_metadata, output_path=output_path, network_diagram_path=network_diagram_path, regions=regions ) # Create database record report = Report( task_id=task_id, file_name=result['file_name'], file_path=result['file_path'], file_size=result['file_size'] ) db.session.add(report) db.session.commit() return report @classmethod def get_report_by_id(cls, report_id: int) -> Optional[Report]: """ Get a report by ID. Args: report_id: ID of the report Returns: Report instance or None """ return Report.query.get(report_id) @classmethod def get_report_by_task_id(cls, task_id: int) -> Optional[Report]: """ Get a report by task ID. Args: task_id: ID of the associated task Returns: Report instance or None """ return Report.query.filter_by(task_id=task_id).first() @classmethod def get_reports(cls, page: int = 1, page_size: int = 20, task_id: int = None, user_id: int = None) -> Dict[str, Any]: """ Get paginated list of reports. Args: page: Page number (1-indexed) page_size: Number of items per page task_id: Optional filter by task ID user_id: Optional filter by user ID (reports from user's tasks) Returns: Dictionary with 'data' and 'pagination' keys """ query = Report.query if task_id: query = query.filter_by(task_id=task_id) if user_id: query = query.join(Task).filter(Task.created_by == user_id) query = query.order_by(Report.created_at.desc()) # Paginate total = query.count() reports = query.offset((page - 1) * page_size).limit(page_size).all() return { 'data': [r.to_dict() for r in reports], 'pagination': { 'page': page, 'page_size': page_size, 'total': total, 'total_pages': (total + page_size - 1) // page_size } } @classmethod def delete_report(cls, report_id: int) -> bool: """ Delete a report and its file. Args: report_id: ID of the report to delete Returns: True if deleted, False if not found """ report = Report.query.get(report_id) if not report: return False # Delete the file if report.file_path and os.path.exists(report.file_path): try: os.remove(report.file_path) except OSError: pass # File may already be deleted # Delete database record db.session.delete(report) db.session.commit() return True @classmethod def get_report_file_path(cls, report_id: int) -> Optional[str]: """ Get the file path for a report. Args: report_id: ID of the report Returns: File path or None if report not found or file doesn't exist """ report = Report.query.get(report_id) if not report or not report.file_path: return None file_path = report.file_path # If the stored path exists, return it if os.path.exists(file_path): return file_path # If stored path is relative, try to resolve it if not os.path.isabs(file_path): # Try relative to current working directory abs_path = os.path.abspath(file_path) if os.path.exists(abs_path): return abs_path # Try to find the file in the reports folder by filename reports_folder = cls.get_reports_folder() filename = os.path.basename(file_path) fallback_path = os.path.join(reports_folder, filename) if os.path.exists(fallback_path): return fallback_path return None @classmethod def regenerate_report(cls, task_id: int, scan_results: Dict[str, List[Dict[str, Any]]], project_metadata: Dict[str, Any], network_diagram_path: str = None, regions: List[str] = None) -> Report: """ Regenerate a report for a task, replacing the existing one. Args: task_id: ID of the associated task scan_results: Dictionary mapping service keys to lists of resources project_metadata: Project metadata for the report network_diagram_path: Optional path to network diagram image regions: Optional list of regions being scanned (for multi-region heading display) Returns: Updated Report model instance """ # Delete existing report if any existing_report = Report.query.filter_by(task_id=task_id).first() if existing_report: cls.delete_report(existing_report.id) # Generate new report return cls.generate_and_store_report( task_id=task_id, scan_results=scan_results, project_metadata=project_metadata, network_diagram_path=network_diagram_path, regions=regions )