""" Scan Data Processor Service This module handles validation and conversion of uploaded CloudShell scan data. It validates the JSON structure and converts the data to ScanResult objects compatible with the existing ReportGenerator. Requirements: - 4.2: Validate JSON structure completeness when receiving uploaded data - 5.1: Generate reports in the same format as existing scan tasks """ from typing import Any, Dict, List, Tuple, Optional from datetime import datetime import logging from app.scanners.base import ResourceData, ScanResult logger = logging.getLogger(__name__) class ScanDataProcessor: """ Processes uploaded CloudShell scan data. This class provides functionality to: - Validate the structure of uploaded JSON scan data - Convert validated data to ScanResult objects for report generation Requirements: - 4.2: Validate JSON structure completeness - 5.1: Convert to format compatible with existing ReportGenerator """ # Required metadata fields based on design document ScanData interface REQUIRED_METADATA_FIELDS = [ 'account_id', 'scan_timestamp', 'regions_scanned', 'services_scanned', ] # Optional metadata fields OPTIONAL_METADATA_FIELDS = [ 'scanner_version', 'total_resources', 'total_errors', ] # Required top-level fields REQUIRED_TOP_LEVEL_FIELDS = [ 'metadata', 'resources', 'errors', ] # Required resource fields based on ResourceData interface REQUIRED_RESOURCE_FIELDS = [ 'account_id', 'region', 'service', 'resource_type', 'resource_id', 'name', ] # Required error fields based on ErrorData interface REQUIRED_ERROR_FIELDS = [ 'service', 'region', 'error', 'error_type', ] def validate_scan_data(self, data: Dict[str, Any]) -> Tuple[bool, List[str]]: """ Validate the structure of uploaded scan data. This method performs comprehensive validation of the JSON structure to ensure it conforms to the ScanData interface defined in the design. Args: data: Dictionary containing the uploaded scan data Returns: Tuple of (is_valid, error_messages): - is_valid: True if data is valid, False otherwise - error_messages: List of validation error messages (empty if valid) Requirements: - 4.2: Validate JSON structure completeness - 6.2: Return list of missing fields when validation fails Validates: - Property 2: JSON structure completeness (metadata, resources, errors) - Property 6: Returns all missing field names on validation failure """ errors: List[str] = [] # Check if data is a dictionary if not isinstance(data, dict): errors.append("Data must be a JSON object (dictionary)") return False, errors # Validate top-level fields missing_top_level = self._validate_required_fields( data, self.REQUIRED_TOP_LEVEL_FIELDS, "top-level" ) errors.extend(missing_top_level) # If top-level fields are missing, we can't continue validation if missing_top_level: return False, errors # Validate metadata structure metadata_errors = self._validate_metadata(data.get('metadata', {})) errors.extend(metadata_errors) # Validate resources structure resources_errors = self._validate_resources(data.get('resources', {})) errors.extend(resources_errors) # Validate errors structure errors_field_errors = self._validate_errors_field(data.get('errors', [])) errors.extend(errors_field_errors) is_valid = len(errors) == 0 if is_valid: logger.info("Scan data validation passed") else: logger.warning(f"Scan data validation failed with {len(errors)} errors") return is_valid, errors def _validate_required_fields( self, data: Dict[str, Any], required_fields: List[str], context: str ) -> List[str]: """ Validate that all required fields are present. Args: data: Dictionary to validate required_fields: List of required field names context: Context string for error messages Returns: List of error messages for missing fields """ errors = [] missing_fields = [] for field in required_fields: if field not in data: missing_fields.append(field) if missing_fields: errors.append( f"Missing required {context} fields: {', '.join(missing_fields)}" ) return errors def _validate_metadata(self, metadata: Any) -> List[str]: """ Validate the metadata section of scan data. Args: metadata: Metadata dictionary to validate Returns: List of validation error messages Validates: - Property 2: metadata.account_id, metadata.scan_timestamp, metadata.regions_scanned, metadata.services_scanned """ errors = [] # Check if metadata is a dictionary if not isinstance(metadata, dict): errors.append("metadata must be a JSON object (dictionary)") return errors # Check required metadata fields missing_fields = [] for field in self.REQUIRED_METADATA_FIELDS: if field not in metadata: missing_fields.append(field) if missing_fields: errors.append( f"Missing required metadata fields: {', '.join(missing_fields)}" ) return errors # Validate field types if not isinstance(metadata.get('account_id'), str): errors.append("metadata.account_id must be a string") if not isinstance(metadata.get('scan_timestamp'), str): errors.append("metadata.scan_timestamp must be a string (ISO 8601 format)") else: # Validate timestamp format timestamp_error = self._validate_timestamp(metadata['scan_timestamp']) if timestamp_error: errors.append(timestamp_error) if not isinstance(metadata.get('regions_scanned'), list): errors.append("metadata.regions_scanned must be an array") if not isinstance(metadata.get('services_scanned'), list): errors.append("metadata.services_scanned must be an array") return errors def _validate_timestamp(self, timestamp: str) -> Optional[str]: """ Validate ISO 8601 timestamp format. Args: timestamp: Timestamp string to validate Returns: Error message if invalid, None if valid """ # Try parsing common ISO 8601 formats formats = [ '%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%dT%H:%M:%S.%f+00:00', '%Y-%m-%dT%H:%M:%S+00:00', '%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S', ] for fmt in formats: try: datetime.strptime(timestamp, fmt) return None except ValueError: continue # Try fromisoformat as fallback (Python 3.7+) try: # Handle 'Z' suffix ts = timestamp.replace('Z', '+00:00') datetime.fromisoformat(ts) return None except ValueError: pass return f"metadata.scan_timestamp '{timestamp}' is not a valid ISO 8601 timestamp" def _validate_resources(self, resources: Any) -> List[str]: """ Validate the resources section of scan data. Args: resources: Resources dictionary to validate Returns: List of validation error messages Validates: - Property 2: resources organized by service type """ errors = [] # Check if resources is a dictionary if not isinstance(resources, dict): errors.append("resources must be a JSON object (dictionary) organized by service type") return errors # Validate each service's resources for service, resource_list in resources.items(): if not isinstance(resource_list, list): errors.append(f"resources.{service} must be an array of resources") continue # Validate each resource in the list for idx, resource in enumerate(resource_list): resource_errors = self._validate_resource(resource, service, idx) errors.extend(resource_errors) return errors def _validate_resource( self, resource: Any, service: str, index: int ) -> List[str]: """ Validate a single resource entry. Args: resource: Resource dictionary to validate service: Service name for context index: Index in the resource list for context Returns: List of validation error messages """ errors = [] context = f"resources.{service}[{index}]" if not isinstance(resource, dict): errors.append(f"{context} must be a JSON object (dictionary)") return errors # Check required resource fields missing_fields = [] for field in self.REQUIRED_RESOURCE_FIELDS: if field not in resource: missing_fields.append(field) if missing_fields: errors.append( f"{context} missing required fields: {', '.join(missing_fields)}" ) # Validate attributes field if present (should be a dict) if 'attributes' in resource and not isinstance(resource['attributes'], dict): errors.append(f"{context}.attributes must be a JSON object (dictionary)") return errors def _validate_errors_field(self, errors_list: Any) -> List[str]: """ Validate the errors section of scan data. Args: errors_list: Errors list to validate Returns: List of validation error messages Validates: - Property 2: errors list with error information """ validation_errors = [] # Check if errors is a list if not isinstance(errors_list, list): validation_errors.append("errors must be an array") return validation_errors # Validate each error entry for idx, error_entry in enumerate(errors_list): error_errors = self._validate_error_entry(error_entry, idx) validation_errors.extend(error_errors) return validation_errors def _validate_error_entry(self, error_entry: Any, index: int) -> List[str]: """ Validate a single error entry. Args: error_entry: Error dictionary to validate index: Index in the errors list for context Returns: List of validation error messages """ errors = [] context = f"errors[{index}]" if not isinstance(error_entry, dict): errors.append(f"{context} must be a JSON object (dictionary)") return errors # Check required error fields missing_fields = [] for field in self.REQUIRED_ERROR_FIELDS: if field not in error_entry: missing_fields.append(field) if missing_fields: errors.append( f"{context} missing required fields: {', '.join(missing_fields)}" ) return errors def convert_to_scan_result(self, data: Dict[str, Any]) -> ScanResult: """ Convert uploaded JSON data to a ScanResult object. This method transforms the uploaded scan data into a ScanResult object that is compatible with the existing ReportGenerator service. Args: data: Validated scan data dictionary Returns: ScanResult object compatible with ReportGenerator Requirements: - 5.1: Generate reports in the same format as existing scan tasks Note: This method assumes the data has already been validated using validate_scan_data(). Calling this with invalid data may raise exceptions. Validates: - Property 8: Report generation consistency - converts to same format used by credential-based scanning """ metadata = data.get('metadata', {}) resources_data = data.get('resources', {}) errors_data = data.get('errors', []) # Create ScanResult with success=True (we have valid data) result = ScanResult(success=True) # Convert resources to ResourceData objects for service, resource_list in resources_data.items(): for resource_dict in resource_list: resource = self._convert_resource(resource_dict) result.add_resource(service, resource) # Add errors for error_dict in errors_data: result.add_error( service=error_dict.get('service', 'unknown'), region=error_dict.get('region', 'unknown'), error=error_dict.get('error', 'Unknown error'), details=error_dict.get('details'), error_type=error_dict.get('error_type', 'Unknown'), ) # Set metadata result.metadata = { 'account_id': metadata.get('account_id', ''), 'regions_scanned': metadata.get('regions_scanned', []), 'services_scanned': metadata.get('services_scanned', []), 'total_resources': sum(len(r) for r in result.resources.values()), 'total_errors': len(result.errors), 'scan_timestamp': metadata.get('scan_timestamp', ''), 'scanner_version': metadata.get('scanner_version', ''), 'source': 'upload', # Mark as uploaded data } logger.info( f"Converted scan data: {result.metadata['total_resources']} resources, " f"{result.metadata['total_errors']} errors" ) return result def _convert_resource(self, resource_dict: Dict[str, Any]) -> ResourceData: """ Convert a resource dictionary to a ResourceData object. Args: resource_dict: Resource dictionary from uploaded data Returns: ResourceData object """ return ResourceData( account_id=resource_dict.get('account_id', ''), region=resource_dict.get('region', ''), service=resource_dict.get('service', ''), resource_type=resource_dict.get('resource_type', ''), resource_id=resource_dict.get('resource_id', ''), name=resource_dict.get('name', ''), attributes=resource_dict.get('attributes', {}), )