| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- """
- Cloud Provider Scanner Abstract Base Class
- This module defines the abstract interface for cloud provider scanners,
- enabling extensibility to support multiple cloud providers (AWS, Azure, GCP, etc.)
- """
- from abc import ABC, abstractmethod
- from typing import List, Dict, Any, Optional
- from dataclasses import dataclass, field
- @dataclass
- class ResourceData:
- """
- Unified resource data structure for all cloud providers.
-
- Attributes:
- account_id: Cloud provider account identifier
- region: Region where the resource is located (or 'global')
- service: Service name (e.g., 'ec2', 'vpc', 's3')
- resource_type: Type of resource within the service
- resource_id: Unique identifier for the resource
- name: Human-readable name of the resource
- attributes: Service-specific attributes dictionary
- """
- account_id: str
- region: str
- service: str
- resource_type: str
- resource_id: str
- name: str
- attributes: Dict[str, Any] = field(default_factory=dict)
-
- def to_dict(self) -> Dict[str, Any]:
- """Convert resource data to dictionary"""
- return {
- 'account_id': self.account_id,
- 'region': self.region,
- 'service': self.service,
- 'resource_type': self.resource_type,
- 'resource_id': self.resource_id,
- 'name': self.name,
- 'attributes': self.attributes
- }
- @dataclass
- class ScanResult:
- """
- Result of a cloud resource scan operation.
-
- Attributes:
- success: Whether the scan completed successfully
- resources: Dictionary mapping service names to lists of ResourceData
- errors: List of error messages encountered during scanning
- metadata: Additional scan metadata (timing, counts, etc.)
- """
- success: bool
- resources: Dict[str, List[ResourceData]] = field(default_factory=dict)
- errors: List[Dict[str, Any]] = field(default_factory=list)
- metadata: Dict[str, Any] = field(default_factory=dict)
-
- def to_dict(self) -> Dict[str, Any]:
- """Convert scan result to dictionary"""
- return {
- 'success': self.success,
- 'resources': {
- service: [r.to_dict() for r in resources]
- for service, resources in self.resources.items()
- },
- 'errors': self.errors,
- 'metadata': self.metadata
- }
-
- def add_resource(self, service: str, resource: ResourceData) -> None:
- """Add a resource to the scan result"""
- if service not in self.resources:
- self.resources[service] = []
- self.resources[service].append(resource)
-
- def add_error(self, service: str, region: str, error: str, details: Optional[str] = None, **kwargs) -> None:
- """Add an error to the scan result"""
- error_entry = {
- 'service': service,
- 'region': region,
- 'error': error,
- 'details': details
- }
- # Include any additional error info (like error_type)
- error_entry.update(kwargs)
- self.errors.append(error_entry)
- class CloudProviderScanner(ABC):
- """
- Abstract base class for cloud provider scanners.
-
- This interface defines the contract that all cloud provider scanners must implement,
- enabling the system to support multiple cloud providers with a unified API.
-
- Requirements:
- - 10.1: Use an abstract interface that can be implemented for different cloud providers
- - 10.2: Adding a new cloud provider only requires implementing the provider-specific scanner
- """
-
- @abstractmethod
- def get_credentials(self, credential_config: Dict[str, Any]) -> Any:
- """
- Get cloud provider credentials/session from configuration.
-
- Args:
- credential_config: Dictionary containing credential configuration
- (e.g., access keys, role ARN, etc.)
-
- Returns:
- Provider-specific credential object (e.g., boto3.Session for AWS)
-
- Raises:
- CredentialError: If credentials are invalid or cannot be obtained
- """
- pass
-
- @abstractmethod
- def list_regions(self) -> List[str]:
- """
- List all available regions for the cloud provider.
-
- Returns:
- List of region identifiers (e.g., ['us-east-1', 'eu-west-1'] for AWS)
- """
- pass
-
- @abstractmethod
- def scan_resources(
- self,
- regions: List[str],
- services: Optional[List[str]] = None,
- progress_callback: Optional[callable] = None
- ) -> ScanResult:
- """
- Scan cloud resources across specified regions and services.
-
- Args:
- regions: List of regions to scan
- services: Optional list of services to scan (None = all supported services)
- progress_callback: Optional callback function for progress updates
- Signature: callback(current: int, total: int, message: str)
-
- Returns:
- ScanResult containing all discovered resources and any errors
- """
- pass
-
- @abstractmethod
- def validate_credentials(self) -> bool:
- """
- Validate that the current credentials are valid and have necessary permissions.
-
- Returns:
- True if credentials are valid, False otherwise
- """
- pass
-
- @abstractmethod
- def get_account_id(self) -> str:
- """
- Get the account/subscription ID for the current credentials.
-
- Returns:
- Account identifier string
- """
- pass
-
- @property
- @abstractmethod
- def provider_name(self) -> str:
- """
- Get the name of the cloud provider.
-
- Returns:
- Provider name (e.g., 'AWS', 'Azure', 'GCP')
- """
- pass
-
- @property
- @abstractmethod
- def supported_services(self) -> List[str]:
- """
- Get list of services supported by this scanner.
-
- Returns:
- List of service identifiers
- """
- pass
-
- @property
- @abstractmethod
- def global_services(self) -> List[str]:
- """
- Get list of global services (not region-specific).
-
- Returns:
- List of global service identifiers
- """
- pass
|