| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705 |
- """
- Unified Error Handling Module
- This module provides:
- - Custom exception classes for different error types
- - Global exception handlers for Flask
- - Unified error response format
- - Error logging with timestamp, context, and stack trace
- Requirements:
- - 8.1: Log errors with timestamp, context, and stack trace
- - 8.5: Gracefully handle critical errors without crashing
- """
- import traceback
- import logging
- from datetime import datetime, timezone
- from functools import wraps
- from typing import Optional, Dict, Any
- from flask import jsonify, request, current_app
- # Configure module logger
- logger = logging.getLogger(__name__)
- # ==================== Error Codes ====================
- class ErrorCode:
- """Standardized error codes for the application"""
- # Authentication errors (401)
- AUTHENTICATION_FAILED = 'AUTHENTICATION_FAILED'
- TOKEN_EXPIRED = 'TOKEN_EXPIRED'
- TOKEN_INVALID = 'TOKEN_INVALID'
-
- # Authorization errors (403)
- ACCESS_DENIED = 'ACCESS_DENIED'
- INSUFFICIENT_PERMISSIONS = 'INSUFFICIENT_PERMISSIONS'
-
- # Validation errors (400)
- VALIDATION_ERROR = 'VALIDATION_ERROR'
- MISSING_REQUIRED_FIELD = 'MISSING_REQUIRED_FIELD'
- INVALID_FIELD_FORMAT = 'INVALID_FIELD_FORMAT'
-
- # Resource errors (404)
- RESOURCE_NOT_FOUND = 'RESOURCE_NOT_FOUND'
- USER_NOT_FOUND = 'USER_NOT_FOUND'
- CREDENTIAL_NOT_FOUND = 'CREDENTIAL_NOT_FOUND'
- TASK_NOT_FOUND = 'TASK_NOT_FOUND'
- REPORT_NOT_FOUND = 'REPORT_NOT_FOUND'
-
- # AWS errors
- AWS_CREDENTIAL_INVALID = 'AWS_CREDENTIAL_INVALID'
- AWS_API_ERROR = 'AWS_API_ERROR'
- AWS_RATE_LIMITED = 'AWS_RATE_LIMITED'
- AWS_ACCESS_DENIED = 'AWS_ACCESS_DENIED'
-
- # System errors (500)
- INTERNAL_ERROR = 'INTERNAL_ERROR'
- DATABASE_ERROR = 'DATABASE_ERROR'
- FILE_SYSTEM_ERROR = 'FILE_SYSTEM_ERROR'
- WORKER_ERROR = 'WORKER_ERROR'
-
- # Task errors
- TASK_EXECUTION_ERROR = 'TASK_EXECUTION_ERROR'
- TASK_TIMEOUT = 'TASK_TIMEOUT'
- SCAN_ERROR = 'SCAN_ERROR'
- REPORT_GENERATION_ERROR = 'REPORT_GENERATION_ERROR'
- # ==================== Custom Exception Classes ====================
- class AppError(Exception):
- """
- Base application error class.
-
- All custom exceptions should inherit from this class.
- Provides consistent error structure with code, message, status_code, and details.
- """
-
- def __init__(
- self,
- message: str,
- code: str = ErrorCode.INTERNAL_ERROR,
- status_code: int = 400,
- details: Optional[Dict[str, Any]] = None,
- original_exception: Optional[Exception] = None
- ):
- super().__init__(message)
- self.message = message
- self.code = code
- self.status_code = status_code
- self.details = details or {}
- self.original_exception = original_exception
- self.timestamp = datetime.now(timezone.utc)
-
- # Capture stack trace if original exception provided
- if original_exception:
- self.stack_trace = traceback.format_exception(
- type(original_exception),
- original_exception,
- original_exception.__traceback__
- )
- else:
- self.stack_trace = traceback.format_stack()
-
- def to_dict(self) -> Dict[str, Any]:
- """Convert error to dictionary for JSON response"""
- error_dict = {
- 'code': self.code,
- 'message': self.message,
- 'timestamp': self.timestamp.isoformat()
- }
- if self.details:
- error_dict['details'] = self.details
- return error_dict
-
- def to_log_dict(self) -> Dict[str, Any]:
- """Convert error to dictionary for logging (includes stack trace)"""
- log_dict = self.to_dict()
- log_dict['status_code'] = self.status_code
- log_dict['stack_trace'] = ''.join(self.stack_trace) if self.stack_trace else None
- return log_dict
- class AuthenticationError(AppError):
- """Authentication related errors (401)"""
-
- def __init__(
- self,
- message: str = "Authentication failed",
- code: str = ErrorCode.AUTHENTICATION_FAILED,
- details: Optional[Dict[str, Any]] = None,
- original_exception: Optional[Exception] = None
- ):
- super().__init__(
- message=message,
- code=code,
- status_code=401,
- details=details,
- original_exception=original_exception
- )
- class AuthorizationError(AppError):
- """Authorization related errors (403)"""
-
- def __init__(
- self,
- message: str = "Access denied",
- code: str = ErrorCode.ACCESS_DENIED,
- details: Optional[Dict[str, Any]] = None,
- original_exception: Optional[Exception] = None
- ):
- super().__init__(
- message=message,
- code=code,
- status_code=403,
- details=details,
- original_exception=original_exception
- )
- class NotFoundError(AppError):
- """Resource not found errors (404)"""
-
- def __init__(
- self,
- message: str = "Resource not found",
- code: str = ErrorCode.RESOURCE_NOT_FOUND,
- details: Optional[Dict[str, Any]] = None,
- original_exception: Optional[Exception] = None
- ):
- super().__init__(
- message=message,
- code=code,
- status_code=404,
- details=details,
- original_exception=original_exception
- )
- class ValidationError(AppError):
- """Validation errors (400)"""
-
- def __init__(
- self,
- message: str = "Validation failed",
- code: str = ErrorCode.VALIDATION_ERROR,
- details: Optional[Dict[str, Any]] = None,
- original_exception: Optional[Exception] = None
- ):
- super().__init__(
- message=message,
- code=code,
- status_code=400,
- details=details,
- original_exception=original_exception
- )
- class AWSError(AppError):
- """AWS related errors"""
-
- def __init__(
- self,
- message: str = "AWS operation failed",
- code: str = ErrorCode.AWS_API_ERROR,
- status_code: int = 500,
- details: Optional[Dict[str, Any]] = None,
- original_exception: Optional[Exception] = None
- ):
- super().__init__(
- message=message,
- code=code,
- status_code=status_code,
- details=details,
- original_exception=original_exception
- )
- class DatabaseError(AppError):
- """Database related errors"""
-
- def __init__(
- self,
- message: str = "Database operation failed",
- code: str = ErrorCode.DATABASE_ERROR,
- details: Optional[Dict[str, Any]] = None,
- original_exception: Optional[Exception] = None
- ):
- super().__init__(
- message=message,
- code=code,
- status_code=500,
- details=details,
- original_exception=original_exception
- )
- class TaskError(AppError):
- """Task execution related errors"""
-
- def __init__(
- self,
- message: str = "Task execution failed",
- code: str = ErrorCode.TASK_EXECUTION_ERROR,
- status_code: int = 500,
- details: Optional[Dict[str, Any]] = None,
- original_exception: Optional[Exception] = None,
- task_id: Optional[int] = None
- ):
- if task_id:
- details = details or {}
- details['task_id'] = task_id
- super().__init__(
- message=message,
- code=code,
- status_code=status_code,
- details=details,
- original_exception=original_exception
- )
- self.task_id = task_id
- class ScanError(TaskError):
- """Scan operation related errors"""
-
- def __init__(
- self,
- message: str = "Scan operation failed",
- code: str = ErrorCode.SCAN_ERROR,
- details: Optional[Dict[str, Any]] = None,
- original_exception: Optional[Exception] = None,
- task_id: Optional[int] = None,
- service: Optional[str] = None,
- region: Optional[str] = None
- ):
- details = details or {}
- if service:
- details['service'] = service
- if region:
- details['region'] = region
- super().__init__(
- message=message,
- code=code,
- status_code=500,
- details=details,
- original_exception=original_exception,
- task_id=task_id
- )
- self.service = service
- self.region = region
- class ReportGenerationError(TaskError):
- """Report generation related errors"""
-
- def __init__(
- self,
- message: str = "Report generation failed",
- code: str = ErrorCode.REPORT_GENERATION_ERROR,
- details: Optional[Dict[str, Any]] = None,
- original_exception: Optional[Exception] = None,
- task_id: Optional[int] = None
- ):
- super().__init__(
- message=message,
- code=code,
- status_code=500,
- details=details,
- original_exception=original_exception,
- task_id=task_id
- )
- # ==================== Error Logging Utilities ====================
- def log_error(
- error: Exception,
- context: Optional[Dict[str, Any]] = None,
- level: str = 'error'
- ) -> Dict[str, Any]:
- """
- Log an error with full context and stack trace.
-
- Requirements:
- - 8.1: Log errors with timestamp, context, and stack trace
-
- Args:
- error: The exception to log
- context: Additional context information
- level: Log level ('error', 'warning', 'critical')
-
- Returns:
- Dictionary containing the logged error information
- """
- timestamp = datetime.now(timezone.utc)
-
- # Build error log entry
- log_entry = {
- 'timestamp': timestamp.isoformat(),
- 'error_type': type(error).__name__,
- 'message': str(error),
- 'context': context or {}
- }
-
- # Add request context if available
- try:
- if request:
- log_entry['request'] = {
- 'method': request.method,
- 'path': request.path,
- 'remote_addr': request.remote_addr,
- 'user_agent': str(request.user_agent) if request.user_agent else None
- }
- except RuntimeError:
- # Outside of request context
- pass
-
- # Add stack trace
- if isinstance(error, AppError):
- log_entry['code'] = error.code
- log_entry['status_code'] = error.status_code
- log_entry['details'] = error.details
- log_entry['stack_trace'] = ''.join(error.stack_trace) if error.stack_trace else None
- else:
- log_entry['stack_trace'] = traceback.format_exc()
-
- # Log at appropriate level
- log_message = f"[{log_entry['error_type']}] {log_entry['message']}"
- if level == 'critical':
- logger.critical(log_message, extra={'error_details': log_entry})
- elif level == 'warning':
- logger.warning(log_message, extra={'error_details': log_entry})
- else:
- logger.error(log_message, extra={'error_details': log_entry})
-
- return log_entry
- def create_error_response(
- code: str,
- message: str,
- status_code: int = 400,
- details: Optional[Dict[str, Any]] = None
- ) -> tuple:
- """
- Create a standardized error response.
-
- Args:
- code: Error code
- message: Human-readable error message
- status_code: HTTP status code
- details: Additional error details
-
- Returns:
- Tuple of (response_json, status_code)
- """
- response = {
- 'error': {
- 'code': code,
- 'message': message,
- 'timestamp': datetime.now(timezone.utc).isoformat()
- }
- }
- if details:
- response['error']['details'] = details
-
- return jsonify(response), status_code
- # ==================== Error Handler Decorator ====================
- def handle_errors(func):
- """
- Decorator to handle errors in route handlers.
-
- Catches exceptions and converts them to appropriate error responses.
- Logs all errors with full context.
-
- Requirements:
- - 8.5: Gracefully handle critical errors without crashing
- """
- @wraps(func)
- def wrapper(*args, **kwargs):
- try:
- return func(*args, **kwargs)
- except AppError as e:
- # Log the error
- log_error(e, context={'handler': func.__name__})
- # Return error response
- return jsonify({'error': e.to_dict()}), e.status_code
- except Exception as e:
- # Log unexpected error
- log_error(e, context={'handler': func.__name__}, level='critical')
- # Return generic error response
- return create_error_response(
- code=ErrorCode.INTERNAL_ERROR,
- message='An unexpected error occurred',
- status_code=500
- )
- return wrapper
- # ==================== Global Error Handlers ====================
- def register_error_handlers(app):
- """
- Register global error handlers for the Flask app.
-
- Requirements:
- - 8.5: Gracefully handle critical errors without crashing
- """
-
- @app.errorhandler(AppError)
- def handle_app_error(error):
- """Handle custom application errors"""
- log_error(error)
- return jsonify({'error': error.to_dict()}), error.status_code
-
- @app.errorhandler(400)
- def handle_bad_request(error):
- """Handle 400 Bad Request errors"""
- log_entry = log_error(error, context={'type': 'bad_request'}, level='warning')
- return create_error_response(
- code=ErrorCode.VALIDATION_ERROR,
- message='Bad request',
- status_code=400,
- details={'description': str(error.description) if hasattr(error, 'description') else None}
- )
-
- @app.errorhandler(401)
- def handle_unauthorized(error):
- """Handle 401 Unauthorized errors"""
- log_entry = log_error(error, context={'type': 'unauthorized'}, level='warning')
- return create_error_response(
- code=ErrorCode.AUTHENTICATION_FAILED,
- message='Authentication required',
- status_code=401
- )
-
- @app.errorhandler(403)
- def handle_forbidden(error):
- """Handle 403 Forbidden errors"""
- log_entry = log_error(error, context={'type': 'forbidden'}, level='warning')
- return create_error_response(
- code=ErrorCode.ACCESS_DENIED,
- message='Access denied',
- status_code=403
- )
-
- @app.errorhandler(404)
- def handle_not_found(error):
- """Handle 404 Not Found errors"""
- log_entry = log_error(error, context={'type': 'not_found'}, level='warning')
- return create_error_response(
- code=ErrorCode.RESOURCE_NOT_FOUND,
- message='Resource not found',
- status_code=404
- )
-
- @app.errorhandler(405)
- def handle_method_not_allowed(error):
- """Handle 405 Method Not Allowed errors"""
- log_entry = log_error(error, context={'type': 'method_not_allowed'}, level='warning')
- return create_error_response(
- code='METHOD_NOT_ALLOWED',
- message='Method not allowed',
- status_code=405
- )
-
- @app.errorhandler(500)
- def handle_internal_error(error):
- """Handle 500 Internal Server errors"""
- log_entry = log_error(error, context={'type': 'internal_error'}, level='critical')
- return create_error_response(
- code=ErrorCode.INTERNAL_ERROR,
- message='An internal error occurred',
- status_code=500
- )
-
- @app.errorhandler(Exception)
- def handle_unexpected_error(error):
- """
- Handle any unexpected exceptions.
-
- Requirements:
- - 8.5: Gracefully handle critical errors without crashing
- """
- log_entry = log_error(error, context={'type': 'unexpected'}, level='critical')
- return create_error_response(
- code=ErrorCode.INTERNAL_ERROR,
- message='An unexpected error occurred',
- status_code=500
- )
- # ==================== Task Error Logging ====================
- class TaskErrorLogger:
- """
- Utility class for logging errors associated with tasks.
-
- Requirements:
- - 8.2: Record error details in task record
- - 8.3: Display error logs associated with task
- """
-
- @staticmethod
- def log_task_error(
- task_id: int,
- error: Exception,
- service: Optional[str] = None,
- region: Optional[str] = None,
- context: Optional[Dict[str, Any]] = None
- ) -> None:
- """
- Log an error for a specific task.
-
- Args:
- task_id: The task ID to associate the error with
- error: The exception that occurred
- service: Optional AWS service name
- region: Optional AWS region
- context: Additional context information
- """
- from app import db
- from app.models import TaskLog
- import json
-
- # Build error details
- details = {
- 'error_type': type(error).__name__,
- 'timestamp': datetime.now(timezone.utc).isoformat()
- }
-
- if service:
- details['service'] = service
- if region:
- details['region'] = region
- if context:
- details['context'] = context
-
- # Add stack trace
- if isinstance(error, AppError):
- details['code'] = error.code
- details['stack_trace'] = ''.join(error.stack_trace) if error.stack_trace else None
- else:
- details['stack_trace'] = traceback.format_exc()
-
- # Create task log entry
- log_entry = TaskLog(
- task_id=task_id,
- level='error',
- message=str(error),
- details=json.dumps(details)
- )
-
- db.session.add(log_entry)
- db.session.commit()
-
- # Also log to application logger
- log_error(error, context={'task_id': task_id, 'service': service, 'region': region})
-
- @staticmethod
- def log_task_warning(
- task_id: int,
- message: str,
- details: Optional[Dict[str, Any]] = None
- ) -> None:
- """Log a warning for a specific task"""
- from app import db
- from app.models import TaskLog
- import json
-
- log_entry = TaskLog(
- task_id=task_id,
- level='warning',
- message=message,
- details=json.dumps(details) if details else None
- )
-
- db.session.add(log_entry)
- db.session.commit()
-
- @staticmethod
- def log_task_info(
- task_id: int,
- message: str,
- details: Optional[Dict[str, Any]] = None
- ) -> None:
- """Log an info message for a specific task"""
- from app import db
- from app.models import TaskLog
- import json
-
- log_entry = TaskLog(
- task_id=task_id,
- level='info',
- message=message,
- details=json.dumps(details) if details else None
- )
-
- db.session.add(log_entry)
- db.session.commit()
-
- @staticmethod
- def get_task_errors(task_id: int) -> list:
- """
- Get all error logs for a specific task.
-
- Requirements:
- - 8.3: Display error logs associated with task
- """
- from app.models import TaskLog
-
- logs = TaskLog.query.filter_by(
- task_id=task_id,
- level='error'
- ).order_by(TaskLog.created_at.desc()).all()
-
- return [log.to_dict() for log in logs]
-
- @staticmethod
- def get_task_logs(
- task_id: int,
- level: Optional[str] = None,
- page: int = 1,
- page_size: int = 20
- ) -> Dict[str, Any]:
- """
- Get paginated logs for a specific task.
-
- Args:
- task_id: The task ID
- level: Optional filter by log level
- page: Page number
- page_size: Number of items per page
-
- Returns:
- Dictionary with logs and pagination info
- """
- from app.models import TaskLog
-
- query = TaskLog.query.filter_by(task_id=task_id)
-
- if level:
- query = query.filter_by(level=level)
-
- query = query.order_by(TaskLog.created_at.desc())
-
- # Paginate
- total = query.count()
- logs = query.offset((page - 1) * page_size).limit(page_size).all()
-
- return {
- 'data': [log.to_dict() for log in logs],
- 'pagination': {
- 'page': page,
- 'page_size': page_size,
- 'total': total,
- 'total_pages': (total + page_size - 1) // page_size
- }
- }
|