| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477 |
- """
- Scan Data Processor Service
- This module handles validation and conversion of uploaded CloudShell scan data.
- It validates the JSON structure and converts the data to ScanResult objects
- compatible with the existing ReportGenerator.
- Requirements:
- - 4.2: Validate JSON structure completeness when receiving uploaded data
- - 5.1: Generate reports in the same format as existing scan tasks
- """
- from typing import Any, Dict, List, Tuple, Optional
- from datetime import datetime
- import logging
- from app.scanners.base import ResourceData, ScanResult
- logger = logging.getLogger(__name__)
- class ScanDataProcessor:
- """
- Processes uploaded CloudShell scan data.
-
- This class provides functionality to:
- - Validate the structure of uploaded JSON scan data
- - Convert validated data to ScanResult objects for report generation
-
- Requirements:
- - 4.2: Validate JSON structure completeness
- - 5.1: Convert to format compatible with existing ReportGenerator
- """
-
- # Required metadata fields based on design document ScanData interface
- REQUIRED_METADATA_FIELDS = [
- 'account_id',
- 'scan_timestamp',
- 'regions_scanned',
- 'services_scanned',
- ]
-
- # Optional metadata fields
- OPTIONAL_METADATA_FIELDS = [
- 'scanner_version',
- 'total_resources',
- 'total_errors',
- ]
-
- # Required top-level fields
- REQUIRED_TOP_LEVEL_FIELDS = [
- 'metadata',
- 'resources',
- 'errors',
- ]
-
- # Required resource fields based on ResourceData interface
- REQUIRED_RESOURCE_FIELDS = [
- 'account_id',
- 'region',
- 'service',
- 'resource_type',
- 'resource_id',
- 'name',
- ]
-
- # Required error fields based on ErrorData interface
- REQUIRED_ERROR_FIELDS = [
- 'service',
- 'region',
- 'error',
- 'error_type',
- ]
-
- def validate_scan_data(self, data: Dict[str, Any]) -> Tuple[bool, List[str]]:
- """
- Validate the structure of uploaded scan data.
-
- This method performs comprehensive validation of the JSON structure
- to ensure it conforms to the ScanData interface defined in the design.
-
- Args:
- data: Dictionary containing the uploaded scan data
-
- Returns:
- Tuple of (is_valid, error_messages):
- - is_valid: True if data is valid, False otherwise
- - error_messages: List of validation error messages (empty if valid)
-
- Requirements:
- - 4.2: Validate JSON structure completeness
- - 6.2: Return list of missing fields when validation fails
-
- Validates:
- - Property 2: JSON structure completeness (metadata, resources, errors)
- - Property 6: Returns all missing field names on validation failure
- """
- errors: List[str] = []
-
- # Check if data is a dictionary
- if not isinstance(data, dict):
- errors.append("Data must be a JSON object (dictionary)")
- return False, errors
-
- # Validate top-level fields
- missing_top_level = self._validate_required_fields(
- data,
- self.REQUIRED_TOP_LEVEL_FIELDS,
- "top-level"
- )
- errors.extend(missing_top_level)
-
- # If top-level fields are missing, we can't continue validation
- if missing_top_level:
- return False, errors
-
- # Validate metadata structure
- metadata_errors = self._validate_metadata(data.get('metadata', {}))
- errors.extend(metadata_errors)
-
- # Validate resources structure
- resources_errors = self._validate_resources(data.get('resources', {}))
- errors.extend(resources_errors)
-
- # Validate errors structure
- errors_field_errors = self._validate_errors_field(data.get('errors', []))
- errors.extend(errors_field_errors)
-
- is_valid = len(errors) == 0
-
- if is_valid:
- logger.info("Scan data validation passed")
- else:
- logger.warning(f"Scan data validation failed with {len(errors)} errors")
-
- return is_valid, errors
-
- def _validate_required_fields(
- self,
- data: Dict[str, Any],
- required_fields: List[str],
- context: str
- ) -> List[str]:
- """
- Validate that all required fields are present.
-
- Args:
- data: Dictionary to validate
- required_fields: List of required field names
- context: Context string for error messages
-
- Returns:
- List of error messages for missing fields
- """
- errors = []
- missing_fields = []
-
- for field in required_fields:
- if field not in data:
- missing_fields.append(field)
-
- if missing_fields:
- errors.append(
- f"Missing required {context} fields: {', '.join(missing_fields)}"
- )
-
- return errors
-
- def _validate_metadata(self, metadata: Any) -> List[str]:
- """
- Validate the metadata section of scan data.
-
- Args:
- metadata: Metadata dictionary to validate
-
- Returns:
- List of validation error messages
-
- Validates:
- - Property 2: metadata.account_id, metadata.scan_timestamp,
- metadata.regions_scanned, metadata.services_scanned
- """
- errors = []
-
- # Check if metadata is a dictionary
- if not isinstance(metadata, dict):
- errors.append("metadata must be a JSON object (dictionary)")
- return errors
-
- # Check required metadata fields
- missing_fields = []
- for field in self.REQUIRED_METADATA_FIELDS:
- if field not in metadata:
- missing_fields.append(field)
-
- if missing_fields:
- errors.append(
- f"Missing required metadata fields: {', '.join(missing_fields)}"
- )
- return errors
-
- # Validate field types
- if not isinstance(metadata.get('account_id'), str):
- errors.append("metadata.account_id must be a string")
-
- if not isinstance(metadata.get('scan_timestamp'), str):
- errors.append("metadata.scan_timestamp must be a string (ISO 8601 format)")
- else:
- # Validate timestamp format
- timestamp_error = self._validate_timestamp(metadata['scan_timestamp'])
- if timestamp_error:
- errors.append(timestamp_error)
-
- if not isinstance(metadata.get('regions_scanned'), list):
- errors.append("metadata.regions_scanned must be an array")
-
- if not isinstance(metadata.get('services_scanned'), list):
- errors.append("metadata.services_scanned must be an array")
-
- return errors
-
- def _validate_timestamp(self, timestamp: str) -> Optional[str]:
- """
- Validate ISO 8601 timestamp format.
-
- Args:
- timestamp: Timestamp string to validate
-
- Returns:
- Error message if invalid, None if valid
- """
- # Try parsing common ISO 8601 formats
- formats = [
- '%Y-%m-%dT%H:%M:%S.%fZ',
- '%Y-%m-%dT%H:%M:%SZ',
- '%Y-%m-%dT%H:%M:%S.%f+00:00',
- '%Y-%m-%dT%H:%M:%S+00:00',
- '%Y-%m-%dT%H:%M:%S.%f',
- '%Y-%m-%dT%H:%M:%S',
- ]
-
- for fmt in formats:
- try:
- datetime.strptime(timestamp, fmt)
- return None
- except ValueError:
- continue
-
- # Try fromisoformat as fallback (Python 3.7+)
- try:
- # Handle 'Z' suffix
- ts = timestamp.replace('Z', '+00:00')
- datetime.fromisoformat(ts)
- return None
- except ValueError:
- pass
-
- return f"metadata.scan_timestamp '{timestamp}' is not a valid ISO 8601 timestamp"
-
- def _validate_resources(self, resources: Any) -> List[str]:
- """
- Validate the resources section of scan data.
-
- Args:
- resources: Resources dictionary to validate
-
- Returns:
- List of validation error messages
-
- Validates:
- - Property 2: resources organized by service type
- """
- errors = []
-
- # Check if resources is a dictionary
- if not isinstance(resources, dict):
- errors.append("resources must be a JSON object (dictionary) organized by service type")
- return errors
-
- # Validate each service's resources
- for service, resource_list in resources.items():
- if not isinstance(resource_list, list):
- errors.append(f"resources.{service} must be an array of resources")
- continue
-
- # Validate each resource in the list
- for idx, resource in enumerate(resource_list):
- resource_errors = self._validate_resource(resource, service, idx)
- errors.extend(resource_errors)
-
- return errors
-
- def _validate_resource(
- self,
- resource: Any,
- service: str,
- index: int
- ) -> List[str]:
- """
- Validate a single resource entry.
-
- Args:
- resource: Resource dictionary to validate
- service: Service name for context
- index: Index in the resource list for context
-
- Returns:
- List of validation error messages
- """
- errors = []
- context = f"resources.{service}[{index}]"
-
- if not isinstance(resource, dict):
- errors.append(f"{context} must be a JSON object (dictionary)")
- return errors
-
- # Check required resource fields
- missing_fields = []
- for field in self.REQUIRED_RESOURCE_FIELDS:
- if field not in resource:
- missing_fields.append(field)
-
- if missing_fields:
- errors.append(
- f"{context} missing required fields: {', '.join(missing_fields)}"
- )
-
- # Validate attributes field if present (should be a dict)
- if 'attributes' in resource and not isinstance(resource['attributes'], dict):
- errors.append(f"{context}.attributes must be a JSON object (dictionary)")
-
- return errors
-
- def _validate_errors_field(self, errors_list: Any) -> List[str]:
- """
- Validate the errors section of scan data.
-
- Args:
- errors_list: Errors list to validate
-
- Returns:
- List of validation error messages
-
- Validates:
- - Property 2: errors list with error information
- """
- validation_errors = []
-
- # Check if errors is a list
- if not isinstance(errors_list, list):
- validation_errors.append("errors must be an array")
- return validation_errors
-
- # Validate each error entry
- for idx, error_entry in enumerate(errors_list):
- error_errors = self._validate_error_entry(error_entry, idx)
- validation_errors.extend(error_errors)
-
- return validation_errors
-
- def _validate_error_entry(self, error_entry: Any, index: int) -> List[str]:
- """
- Validate a single error entry.
-
- Args:
- error_entry: Error dictionary to validate
- index: Index in the errors list for context
-
- Returns:
- List of validation error messages
- """
- errors = []
- context = f"errors[{index}]"
-
- if not isinstance(error_entry, dict):
- errors.append(f"{context} must be a JSON object (dictionary)")
- return errors
-
- # Check required error fields
- missing_fields = []
- for field in self.REQUIRED_ERROR_FIELDS:
- if field not in error_entry:
- missing_fields.append(field)
-
- if missing_fields:
- errors.append(
- f"{context} missing required fields: {', '.join(missing_fields)}"
- )
-
- return errors
-
- def convert_to_scan_result(self, data: Dict[str, Any]) -> ScanResult:
- """
- Convert uploaded JSON data to a ScanResult object.
-
- This method transforms the uploaded scan data into a ScanResult object
- that is compatible with the existing ReportGenerator service.
-
- Args:
- data: Validated scan data dictionary
-
- Returns:
- ScanResult object compatible with ReportGenerator
-
- Requirements:
- - 5.1: Generate reports in the same format as existing scan tasks
-
- Note:
- This method assumes the data has already been validated using
- validate_scan_data(). Calling this with invalid data may raise
- exceptions.
-
- Validates:
- - Property 8: Report generation consistency - converts to same format
- used by credential-based scanning
- """
- metadata = data.get('metadata', {})
- resources_data = data.get('resources', {})
- errors_data = data.get('errors', [])
-
- # Create ScanResult with success=True (we have valid data)
- result = ScanResult(success=True)
-
- # Convert resources to ResourceData objects
- for service, resource_list in resources_data.items():
- for resource_dict in resource_list:
- resource = self._convert_resource(resource_dict)
- result.add_resource(service, resource)
-
- # Add errors
- for error_dict in errors_data:
- result.add_error(
- service=error_dict.get('service', 'unknown'),
- region=error_dict.get('region', 'unknown'),
- error=error_dict.get('error', 'Unknown error'),
- details=error_dict.get('details'),
- error_type=error_dict.get('error_type', 'Unknown'),
- )
-
- # Set metadata
- result.metadata = {
- 'account_id': metadata.get('account_id', ''),
- 'regions_scanned': metadata.get('regions_scanned', []),
- 'services_scanned': metadata.get('services_scanned', []),
- 'total_resources': sum(len(r) for r in result.resources.values()),
- 'total_errors': len(result.errors),
- 'scan_timestamp': metadata.get('scan_timestamp', ''),
- 'scanner_version': metadata.get('scanner_version', ''),
- 'source': 'upload', # Mark as uploaded data
- }
-
- logger.info(
- f"Converted scan data: {result.metadata['total_resources']} resources, "
- f"{result.metadata['total_errors']} errors"
- )
-
- return result
-
- def _convert_resource(self, resource_dict: Dict[str, Any]) -> ResourceData:
- """
- Convert a resource dictionary to a ResourceData object.
-
- Args:
- resource_dict: Resource dictionary from uploaded data
-
- Returns:
- ResourceData object
- """
- return ResourceData(
- account_id=resource_dict.get('account_id', ''),
- region=resource_dict.get('region', ''),
- service=resource_dict.get('service', ''),
- resource_type=resource_dict.get('resource_type', ''),
- resource_id=resource_dict.get('resource_id', ''),
- name=resource_dict.get('name', ''),
- attributes=resource_dict.get('attributes', {}),
- )
|