""" Cloud Provider Scanner Abstract Base Class This module defines the abstract interface for cloud provider scanners, enabling extensibility to support multiple cloud providers (AWS, Azure, GCP, etc.) """ from abc import ABC, abstractmethod from typing import List, Dict, Any, Optional from dataclasses import dataclass, field @dataclass class ResourceData: """ Unified resource data structure for all cloud providers. Attributes: account_id: Cloud provider account identifier region: Region where the resource is located (or 'global') service: Service name (e.g., 'ec2', 'vpc', 's3') resource_type: Type of resource within the service resource_id: Unique identifier for the resource name: Human-readable name of the resource attributes: Service-specific attributes dictionary """ account_id: str region: str service: str resource_type: str resource_id: str name: str attributes: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: """Convert resource data to dictionary""" return { 'account_id': self.account_id, 'region': self.region, 'service': self.service, 'resource_type': self.resource_type, 'resource_id': self.resource_id, 'name': self.name, 'attributes': self.attributes } @dataclass class ScanResult: """ Result of a cloud resource scan operation. Attributes: success: Whether the scan completed successfully resources: Dictionary mapping service names to lists of ResourceData errors: List of error messages encountered during scanning metadata: Additional scan metadata (timing, counts, etc.) """ success: bool resources: Dict[str, List[ResourceData]] = field(default_factory=dict) errors: List[Dict[str, Any]] = field(default_factory=list) metadata: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: """Convert scan result to dictionary""" return { 'success': self.success, 'resources': { service: [r.to_dict() for r in resources] for service, resources in self.resources.items() }, 'errors': self.errors, 'metadata': self.metadata } def add_resource(self, service: str, resource: ResourceData) -> None: """Add a resource to the scan result""" if service not in self.resources: self.resources[service] = [] self.resources[service].append(resource) def add_error(self, service: str, region: str, error: str, details: Optional[str] = None, **kwargs) -> None: """Add an error to the scan result""" error_entry = { 'service': service, 'region': region, 'error': error, 'details': details } # Include any additional error info (like error_type) error_entry.update(kwargs) self.errors.append(error_entry) class CloudProviderScanner(ABC): """ Abstract base class for cloud provider scanners. This interface defines the contract that all cloud provider scanners must implement, enabling the system to support multiple cloud providers with a unified API. Requirements: - 10.1: Use an abstract interface that can be implemented for different cloud providers - 10.2: Adding a new cloud provider only requires implementing the provider-specific scanner """ @abstractmethod def get_credentials(self, credential_config: Dict[str, Any]) -> Any: """ Get cloud provider credentials/session from configuration. Args: credential_config: Dictionary containing credential configuration (e.g., access keys, role ARN, etc.) Returns: Provider-specific credential object (e.g., boto3.Session for AWS) Raises: CredentialError: If credentials are invalid or cannot be obtained """ pass @abstractmethod def list_regions(self) -> List[str]: """ List all available regions for the cloud provider. Returns: List of region identifiers (e.g., ['us-east-1', 'eu-west-1'] for AWS) """ pass @abstractmethod def scan_resources( self, regions: List[str], services: Optional[List[str]] = None, progress_callback: Optional[callable] = None ) -> ScanResult: """ Scan cloud resources across specified regions and services. Args: regions: List of regions to scan services: Optional list of services to scan (None = all supported services) progress_callback: Optional callback function for progress updates Signature: callback(current: int, total: int, message: str) Returns: ScanResult containing all discovered resources and any errors """ pass @abstractmethod def validate_credentials(self) -> bool: """ Validate that the current credentials are valid and have necessary permissions. Returns: True if credentials are valid, False otherwise """ pass @abstractmethod def get_account_id(self) -> str: """ Get the account/subscription ID for the current credentials. Returns: Account identifier string """ pass @property @abstractmethod def provider_name(self) -> str: """ Get the name of the cloud provider. Returns: Provider name (e.g., 'AWS', 'Azure', 'GCP') """ pass @property @abstractmethod def supported_services(self) -> List[str]: """ Get list of services supported by this scanner. Returns: List of service identifiers """ pass @property @abstractmethod def global_services(self) -> List[str]: """ Get list of global services (not region-specific). Returns: List of global service identifiers """ pass