""" Unified Error Handling Module This module provides: - Custom exception classes for different error types - Global exception handlers for Flask - Unified error response format - Error logging with timestamp, context, and stack trace Requirements: - 8.1: Log errors with timestamp, context, and stack trace - 8.5: Gracefully handle critical errors without crashing """ import traceback import logging from datetime import datetime, timezone from functools import wraps from typing import Optional, Dict, Any from flask import jsonify, request, current_app # Configure module logger logger = logging.getLogger(__name__) # ==================== Error Codes ==================== class ErrorCode: """Standardized error codes for the application""" # Authentication errors (401) AUTHENTICATION_FAILED = 'AUTHENTICATION_FAILED' TOKEN_EXPIRED = 'TOKEN_EXPIRED' TOKEN_INVALID = 'TOKEN_INVALID' # Authorization errors (403) ACCESS_DENIED = 'ACCESS_DENIED' INSUFFICIENT_PERMISSIONS = 'INSUFFICIENT_PERMISSIONS' # Validation errors (400) VALIDATION_ERROR = 'VALIDATION_ERROR' MISSING_REQUIRED_FIELD = 'MISSING_REQUIRED_FIELD' INVALID_FIELD_FORMAT = 'INVALID_FIELD_FORMAT' # Resource errors (404) RESOURCE_NOT_FOUND = 'RESOURCE_NOT_FOUND' USER_NOT_FOUND = 'USER_NOT_FOUND' CREDENTIAL_NOT_FOUND = 'CREDENTIAL_NOT_FOUND' TASK_NOT_FOUND = 'TASK_NOT_FOUND' REPORT_NOT_FOUND = 'REPORT_NOT_FOUND' # AWS errors AWS_CREDENTIAL_INVALID = 'AWS_CREDENTIAL_INVALID' AWS_API_ERROR = 'AWS_API_ERROR' AWS_RATE_LIMITED = 'AWS_RATE_LIMITED' AWS_ACCESS_DENIED = 'AWS_ACCESS_DENIED' # System errors (500) INTERNAL_ERROR = 'INTERNAL_ERROR' DATABASE_ERROR = 'DATABASE_ERROR' FILE_SYSTEM_ERROR = 'FILE_SYSTEM_ERROR' WORKER_ERROR = 'WORKER_ERROR' # Task errors TASK_EXECUTION_ERROR = 'TASK_EXECUTION_ERROR' TASK_TIMEOUT = 'TASK_TIMEOUT' SCAN_ERROR = 'SCAN_ERROR' REPORT_GENERATION_ERROR = 'REPORT_GENERATION_ERROR' # ==================== Custom Exception Classes ==================== class AppError(Exception): """ Base application error class. All custom exceptions should inherit from this class. Provides consistent error structure with code, message, status_code, and details. """ def __init__( self, message: str, code: str = ErrorCode.INTERNAL_ERROR, status_code: int = 400, details: Optional[Dict[str, Any]] = None, original_exception: Optional[Exception] = None ): super().__init__(message) self.message = message self.code = code self.status_code = status_code self.details = details or {} self.original_exception = original_exception self.timestamp = datetime.now(timezone.utc) # Capture stack trace if original exception provided if original_exception: self.stack_trace = traceback.format_exception( type(original_exception), original_exception, original_exception.__traceback__ ) else: self.stack_trace = traceback.format_stack() def to_dict(self) -> Dict[str, Any]: """Convert error to dictionary for JSON response""" error_dict = { 'code': self.code, 'message': self.message, 'timestamp': self.timestamp.isoformat() } if self.details: error_dict['details'] = self.details return error_dict def to_log_dict(self) -> Dict[str, Any]: """Convert error to dictionary for logging (includes stack trace)""" log_dict = self.to_dict() log_dict['status_code'] = self.status_code log_dict['stack_trace'] = ''.join(self.stack_trace) if self.stack_trace else None return log_dict class AuthenticationError(AppError): """Authentication related errors (401)""" def __init__( self, message: str = "Authentication failed", code: str = ErrorCode.AUTHENTICATION_FAILED, details: Optional[Dict[str, Any]] = None, original_exception: Optional[Exception] = None ): super().__init__( message=message, code=code, status_code=401, details=details, original_exception=original_exception ) class AuthorizationError(AppError): """Authorization related errors (403)""" def __init__( self, message: str = "Access denied", code: str = ErrorCode.ACCESS_DENIED, details: Optional[Dict[str, Any]] = None, original_exception: Optional[Exception] = None ): super().__init__( message=message, code=code, status_code=403, details=details, original_exception=original_exception ) class NotFoundError(AppError): """Resource not found errors (404)""" def __init__( self, message: str = "Resource not found", code: str = ErrorCode.RESOURCE_NOT_FOUND, details: Optional[Dict[str, Any]] = None, original_exception: Optional[Exception] = None ): super().__init__( message=message, code=code, status_code=404, details=details, original_exception=original_exception ) class ValidationError(AppError): """Validation errors (400)""" def __init__( self, message: str = "Validation failed", code: str = ErrorCode.VALIDATION_ERROR, details: Optional[Dict[str, Any]] = None, original_exception: Optional[Exception] = None ): super().__init__( message=message, code=code, status_code=400, details=details, original_exception=original_exception ) class AWSError(AppError): """AWS related errors""" def __init__( self, message: str = "AWS operation failed", code: str = ErrorCode.AWS_API_ERROR, status_code: int = 500, details: Optional[Dict[str, Any]] = None, original_exception: Optional[Exception] = None ): super().__init__( message=message, code=code, status_code=status_code, details=details, original_exception=original_exception ) class DatabaseError(AppError): """Database related errors""" def __init__( self, message: str = "Database operation failed", code: str = ErrorCode.DATABASE_ERROR, details: Optional[Dict[str, Any]] = None, original_exception: Optional[Exception] = None ): super().__init__( message=message, code=code, status_code=500, details=details, original_exception=original_exception ) class TaskError(AppError): """Task execution related errors""" def __init__( self, message: str = "Task execution failed", code: str = ErrorCode.TASK_EXECUTION_ERROR, status_code: int = 500, details: Optional[Dict[str, Any]] = None, original_exception: Optional[Exception] = None, task_id: Optional[int] = None ): if task_id: details = details or {} details['task_id'] = task_id super().__init__( message=message, code=code, status_code=status_code, details=details, original_exception=original_exception ) self.task_id = task_id class ScanError(TaskError): """Scan operation related errors""" def __init__( self, message: str = "Scan operation failed", code: str = ErrorCode.SCAN_ERROR, details: Optional[Dict[str, Any]] = None, original_exception: Optional[Exception] = None, task_id: Optional[int] = None, service: Optional[str] = None, region: Optional[str] = None ): details = details or {} if service: details['service'] = service if region: details['region'] = region super().__init__( message=message, code=code, status_code=500, details=details, original_exception=original_exception, task_id=task_id ) self.service = service self.region = region class ReportGenerationError(TaskError): """Report generation related errors""" def __init__( self, message: str = "Report generation failed", code: str = ErrorCode.REPORT_GENERATION_ERROR, details: Optional[Dict[str, Any]] = None, original_exception: Optional[Exception] = None, task_id: Optional[int] = None ): super().__init__( message=message, code=code, status_code=500, details=details, original_exception=original_exception, task_id=task_id ) # ==================== Error Logging Utilities ==================== def log_error( error: Exception, context: Optional[Dict[str, Any]] = None, level: str = 'error' ) -> Dict[str, Any]: """ Log an error with full context and stack trace. Requirements: - 8.1: Log errors with timestamp, context, and stack trace Args: error: The exception to log context: Additional context information level: Log level ('error', 'warning', 'critical') Returns: Dictionary containing the logged error information """ timestamp = datetime.now(timezone.utc) # Build error log entry log_entry = { 'timestamp': timestamp.isoformat(), 'error_type': type(error).__name__, 'message': str(error), 'context': context or {} } # Add request context if available try: if request: log_entry['request'] = { 'method': request.method, 'path': request.path, 'remote_addr': request.remote_addr, 'user_agent': str(request.user_agent) if request.user_agent else None } except RuntimeError: # Outside of request context pass # Add stack trace if isinstance(error, AppError): log_entry['code'] = error.code log_entry['status_code'] = error.status_code log_entry['details'] = error.details log_entry['stack_trace'] = ''.join(error.stack_trace) if error.stack_trace else None else: log_entry['stack_trace'] = traceback.format_exc() # Log at appropriate level log_message = f"[{log_entry['error_type']}] {log_entry['message']}" if level == 'critical': logger.critical(log_message, extra={'error_details': log_entry}) elif level == 'warning': logger.warning(log_message, extra={'error_details': log_entry}) else: logger.error(log_message, extra={'error_details': log_entry}) return log_entry def create_error_response( code: str, message: str, status_code: int = 400, details: Optional[Dict[str, Any]] = None ) -> tuple: """ Create a standardized error response. Args: code: Error code message: Human-readable error message status_code: HTTP status code details: Additional error details Returns: Tuple of (response_json, status_code) """ response = { 'error': { 'code': code, 'message': message, 'timestamp': datetime.now(timezone.utc).isoformat() } } if details: response['error']['details'] = details return jsonify(response), status_code # ==================== Error Handler Decorator ==================== def handle_errors(func): """ Decorator to handle errors in route handlers. Catches exceptions and converts them to appropriate error responses. Logs all errors with full context. Requirements: - 8.5: Gracefully handle critical errors without crashing """ @wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except AppError as e: # Log the error log_error(e, context={'handler': func.__name__}) # Return error response return jsonify({'error': e.to_dict()}), e.status_code except Exception as e: # Log unexpected error log_error(e, context={'handler': func.__name__}, level='critical') # Return generic error response return create_error_response( code=ErrorCode.INTERNAL_ERROR, message='An unexpected error occurred', status_code=500 ) return wrapper # ==================== Global Error Handlers ==================== def register_error_handlers(app): """ Register global error handlers for the Flask app. Requirements: - 8.5: Gracefully handle critical errors without crashing """ @app.errorhandler(AppError) def handle_app_error(error): """Handle custom application errors""" log_error(error) return jsonify({'error': error.to_dict()}), error.status_code @app.errorhandler(400) def handle_bad_request(error): """Handle 400 Bad Request errors""" log_entry = log_error(error, context={'type': 'bad_request'}, level='warning') return create_error_response( code=ErrorCode.VALIDATION_ERROR, message='Bad request', status_code=400, details={'description': str(error.description) if hasattr(error, 'description') else None} ) @app.errorhandler(401) def handle_unauthorized(error): """Handle 401 Unauthorized errors""" log_entry = log_error(error, context={'type': 'unauthorized'}, level='warning') return create_error_response( code=ErrorCode.AUTHENTICATION_FAILED, message='Authentication required', status_code=401 ) @app.errorhandler(403) def handle_forbidden(error): """Handle 403 Forbidden errors""" log_entry = log_error(error, context={'type': 'forbidden'}, level='warning') return create_error_response( code=ErrorCode.ACCESS_DENIED, message='Access denied', status_code=403 ) @app.errorhandler(404) def handle_not_found(error): """Handle 404 Not Found errors""" log_entry = log_error(error, context={'type': 'not_found'}, level='warning') return create_error_response( code=ErrorCode.RESOURCE_NOT_FOUND, message='Resource not found', status_code=404 ) @app.errorhandler(405) def handle_method_not_allowed(error): """Handle 405 Method Not Allowed errors""" log_entry = log_error(error, context={'type': 'method_not_allowed'}, level='warning') return create_error_response( code='METHOD_NOT_ALLOWED', message='Method not allowed', status_code=405 ) @app.errorhandler(500) def handle_internal_error(error): """Handle 500 Internal Server errors""" log_entry = log_error(error, context={'type': 'internal_error'}, level='critical') return create_error_response( code=ErrorCode.INTERNAL_ERROR, message='An internal error occurred', status_code=500 ) @app.errorhandler(Exception) def handle_unexpected_error(error): """ Handle any unexpected exceptions. Requirements: - 8.5: Gracefully handle critical errors without crashing """ log_entry = log_error(error, context={'type': 'unexpected'}, level='critical') return create_error_response( code=ErrorCode.INTERNAL_ERROR, message='An unexpected error occurred', status_code=500 ) # ==================== Task Error Logging ==================== class TaskErrorLogger: """ Utility class for logging errors associated with tasks. Requirements: - 8.2: Record error details in task record - 8.3: Display error logs associated with task """ @staticmethod def log_task_error( task_id: int, error: Exception, service: Optional[str] = None, region: Optional[str] = None, context: Optional[Dict[str, Any]] = None ) -> None: """ Log an error for a specific task. Args: task_id: The task ID to associate the error with error: The exception that occurred service: Optional AWS service name region: Optional AWS region context: Additional context information """ from app import db from app.models import TaskLog import json # Build error details details = { 'error_type': type(error).__name__, 'timestamp': datetime.now(timezone.utc).isoformat() } if service: details['service'] = service if region: details['region'] = region if context: details['context'] = context # Add stack trace if isinstance(error, AppError): details['code'] = error.code details['stack_trace'] = ''.join(error.stack_trace) if error.stack_trace else None else: details['stack_trace'] = traceback.format_exc() # Create task log entry log_entry = TaskLog( task_id=task_id, level='error', message=str(error), details=json.dumps(details) ) db.session.add(log_entry) db.session.commit() # Also log to application logger log_error(error, context={'task_id': task_id, 'service': service, 'region': region}) @staticmethod def log_task_warning( task_id: int, message: str, details: Optional[Dict[str, Any]] = None ) -> None: """Log a warning for a specific task""" from app import db from app.models import TaskLog import json log_entry = TaskLog( task_id=task_id, level='warning', message=message, details=json.dumps(details) if details else None ) db.session.add(log_entry) db.session.commit() @staticmethod def log_task_info( task_id: int, message: str, details: Optional[Dict[str, Any]] = None ) -> None: """Log an info message for a specific task""" from app import db from app.models import TaskLog import json log_entry = TaskLog( task_id=task_id, level='info', message=message, details=json.dumps(details) if details else None ) db.session.add(log_entry) db.session.commit() @staticmethod def get_task_errors(task_id: int) -> list: """ Get all error logs for a specific task. Requirements: - 8.3: Display error logs associated with task """ from app.models import TaskLog logs = TaskLog.query.filter_by( task_id=task_id, level='error' ).order_by(TaskLog.created_at.desc()).all() return [log.to_dict() for log in logs] @staticmethod def get_task_logs( task_id: int, level: Optional[str] = None, page: int = 1, page_size: int = 20 ) -> Dict[str, Any]: """ Get paginated logs for a specific task. Args: task_id: The task ID level: Optional filter by log level page: Page number page_size: Number of items per page Returns: Dictionary with logs and pagination info """ from app.models import TaskLog query = TaskLog.query.filter_by(task_id=task_id) if level: query = query.filter_by(level=level) query = query.order_by(TaskLog.created_at.desc()) # Paginate total = query.count() logs = query.offset((page - 1) * page_size).limit(page_size).all() return { 'data': [log.to_dict() for log in logs], 'pagination': { 'page': page, 'page_size': page_size, 'total': total, 'total_pages': (total + page_size - 1) // page_size } }