base.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. """
  2. Cloud Provider Scanner Abstract Base Class
  3. This module defines the abstract interface for cloud provider scanners,
  4. enabling extensibility to support multiple cloud providers (AWS, Azure, GCP, etc.)
  5. """
  6. from abc import ABC, abstractmethod
  7. from typing import List, Dict, Any, Optional
  8. from dataclasses import dataclass, field
  9. @dataclass
  10. class ResourceData:
  11. """
  12. Unified resource data structure for all cloud providers.
  13. Attributes:
  14. account_id: Cloud provider account identifier
  15. region: Region where the resource is located (or 'global')
  16. service: Service name (e.g., 'ec2', 'vpc', 's3')
  17. resource_type: Type of resource within the service
  18. resource_id: Unique identifier for the resource
  19. name: Human-readable name of the resource
  20. attributes: Service-specific attributes dictionary
  21. """
  22. account_id: str
  23. region: str
  24. service: str
  25. resource_type: str
  26. resource_id: str
  27. name: str
  28. attributes: Dict[str, Any] = field(default_factory=dict)
  29. def to_dict(self) -> Dict[str, Any]:
  30. """Convert resource data to dictionary"""
  31. return {
  32. 'account_id': self.account_id,
  33. 'region': self.region,
  34. 'service': self.service,
  35. 'resource_type': self.resource_type,
  36. 'resource_id': self.resource_id,
  37. 'name': self.name,
  38. 'attributes': self.attributes
  39. }
  40. @dataclass
  41. class ScanResult:
  42. """
  43. Result of a cloud resource scan operation.
  44. Attributes:
  45. success: Whether the scan completed successfully
  46. resources: Dictionary mapping service names to lists of ResourceData
  47. errors: List of error messages encountered during scanning
  48. metadata: Additional scan metadata (timing, counts, etc.)
  49. """
  50. success: bool
  51. resources: Dict[str, List[ResourceData]] = field(default_factory=dict)
  52. errors: List[Dict[str, Any]] = field(default_factory=list)
  53. metadata: Dict[str, Any] = field(default_factory=dict)
  54. def to_dict(self) -> Dict[str, Any]:
  55. """Convert scan result to dictionary"""
  56. return {
  57. 'success': self.success,
  58. 'resources': {
  59. service: [r.to_dict() for r in resources]
  60. for service, resources in self.resources.items()
  61. },
  62. 'errors': self.errors,
  63. 'metadata': self.metadata
  64. }
  65. def add_resource(self, service: str, resource: ResourceData) -> None:
  66. """Add a resource to the scan result"""
  67. if service not in self.resources:
  68. self.resources[service] = []
  69. self.resources[service].append(resource)
  70. def add_error(self, service: str, region: str, error: str, details: Optional[str] = None, **kwargs) -> None:
  71. """Add an error to the scan result"""
  72. error_entry = {
  73. 'service': service,
  74. 'region': region,
  75. 'error': error,
  76. 'details': details
  77. }
  78. # Include any additional error info (like error_type)
  79. error_entry.update(kwargs)
  80. self.errors.append(error_entry)
  81. class CloudProviderScanner(ABC):
  82. """
  83. Abstract base class for cloud provider scanners.
  84. This interface defines the contract that all cloud provider scanners must implement,
  85. enabling the system to support multiple cloud providers with a unified API.
  86. Requirements:
  87. - 10.1: Use an abstract interface that can be implemented for different cloud providers
  88. - 10.2: Adding a new cloud provider only requires implementing the provider-specific scanner
  89. """
  90. @abstractmethod
  91. def get_credentials(self, credential_config: Dict[str, Any]) -> Any:
  92. """
  93. Get cloud provider credentials/session from configuration.
  94. Args:
  95. credential_config: Dictionary containing credential configuration
  96. (e.g., access keys, role ARN, etc.)
  97. Returns:
  98. Provider-specific credential object (e.g., boto3.Session for AWS)
  99. Raises:
  100. CredentialError: If credentials are invalid or cannot be obtained
  101. """
  102. pass
  103. @abstractmethod
  104. def list_regions(self) -> List[str]:
  105. """
  106. List all available regions for the cloud provider.
  107. Returns:
  108. List of region identifiers (e.g., ['us-east-1', 'eu-west-1'] for AWS)
  109. """
  110. pass
  111. @abstractmethod
  112. def scan_resources(
  113. self,
  114. regions: List[str],
  115. services: Optional[List[str]] = None,
  116. progress_callback: Optional[callable] = None
  117. ) -> ScanResult:
  118. """
  119. Scan cloud resources across specified regions and services.
  120. Args:
  121. regions: List of regions to scan
  122. services: Optional list of services to scan (None = all supported services)
  123. progress_callback: Optional callback function for progress updates
  124. Signature: callback(current: int, total: int, message: str)
  125. Returns:
  126. ScanResult containing all discovered resources and any errors
  127. """
  128. pass
  129. @abstractmethod
  130. def validate_credentials(self) -> bool:
  131. """
  132. Validate that the current credentials are valid and have necessary permissions.
  133. Returns:
  134. True if credentials are valid, False otherwise
  135. """
  136. pass
  137. @abstractmethod
  138. def get_account_id(self) -> str:
  139. """
  140. Get the account/subscription ID for the current credentials.
  141. Returns:
  142. Account identifier string
  143. """
  144. pass
  145. @property
  146. @abstractmethod
  147. def provider_name(self) -> str:
  148. """
  149. Get the name of the cloud provider.
  150. Returns:
  151. Provider name (e.g., 'AWS', 'Azure', 'GCP')
  152. """
  153. pass
  154. @property
  155. @abstractmethod
  156. def supported_services(self) -> List[str]:
  157. """
  158. Get list of services supported by this scanner.
  159. Returns:
  160. List of service identifiers
  161. """
  162. pass
  163. @property
  164. @abstractmethod
  165. def global_services(self) -> List[str]:
  166. """
  167. Get list of global services (not region-specific).
  168. Returns:
  169. List of global service identifiers
  170. """
  171. pass