""" AWS Resource Scanner Implementation This module implements the CloudProviderScanner interface for AWS, providing comprehensive resource scanning capabilities across all supported services. Requirements: - 4.3: Process multiple accounts/regions in parallel - 4.4: Scan all supported AWS services concurrently - 5.5: Retry with exponential backoff up to 3 times - 8.2: Record error details in task record """ import boto3 from botocore.exceptions import ClientError, BotoCoreError from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict, Any, Optional, Callable import logging import traceback from app.scanners.base import CloudProviderScanner, ResourceData, ScanResult from app.scanners.credentials import AWSCredentialProvider, CredentialError from app.scanners.utils import retry_with_backoff logger = logging.getLogger(__name__) class AWSScanner(CloudProviderScanner): """ AWS Resource Scanner implementation. Scans AWS resources across multiple regions and services using boto3, with support for parallel scanning and automatic retry with exponential backoff. """ # All supported AWS services SUPPORTED_SERVICES = [ 'vpc', 'subnet', 'route_table', 'internet_gateway', 'nat_gateway', 'security_group', 'vpc_endpoint', 'vpc_peering', 'customer_gateway', 'virtual_private_gateway', 'vpn_connection', 'ec2', 'elastic_ip', 'autoscaling', 'elb', 'target_group', 'rds', 'elasticache', 'eks', 'lambda', 's3', 's3_event_notification', 'cloudfront', 'route53', 'acm', 'waf', 'sns', 'cloudwatch', 'eventbridge', 'cloudtrail', 'config' ] # Global services (not region-specific) GLOBAL_SERVICES = ['cloudfront', 'route53', 'waf', 's3', 's3_event_notification', 'cloudtrail'] # Maximum workers for parallel scanning MAX_WORKERS = 10 def __init__(self, credential_provider: AWSCredentialProvider): """ Initialize the AWS Scanner. Args: credential_provider: AWSCredentialProvider instance for authentication """ self.credential_provider = credential_provider self._account_id: Optional[str] = None @property def provider_name(self) -> str: return 'AWS' @property def supported_services(self) -> List[str]: return self.SUPPORTED_SERVICES.copy() @property def global_services(self) -> List[str]: return self.GLOBAL_SERVICES.copy() def get_credentials(self, credential_config: Dict[str, Any]) -> boto3.Session: """Get boto3 Session from credential configuration""" return self.credential_provider.get_session() def list_regions(self) -> List[str]: """List all available AWS regions""" try: session = self.credential_provider.get_session(region_name='us-east-1') ec2_client = session.client('ec2') response = ec2_client.describe_regions() return [region['RegionName'] for region in response['Regions']] except Exception as e: logger.error(f"Failed to list regions: {str(e)}") # Return default regions if API call fails return [ 'us-east-1', 'us-east-2', 'us-west-1', 'us-west-2', 'eu-west-1', 'eu-west-2', 'eu-west-3', 'eu-central-1', 'ap-northeast-1', 'ap-northeast-2', 'ap-southeast-1', 'ap-southeast-2', 'ap-south-1', 'sa-east-1', 'ca-central-1' ] def validate_credentials(self) -> bool: """Validate AWS credentials""" return self.credential_provider.validate() def get_account_id(self) -> str: """Get AWS account ID""" if self._account_id: return self._account_id self._account_id = self.credential_provider.get_account_id() return self._account_id def scan_resources( self, regions: List[str], services: Optional[List[str]] = None, progress_callback: Optional[Callable[[int, int, str], None]] = None ) -> ScanResult: """ Scan AWS resources across specified regions and services. Args: regions: List of regions to scan services: Optional list of services to scan (None = all) progress_callback: Optional callback for progress updates Returns: ScanResult with all discovered resources """ result = ScanResult(success=True) account_id = self.get_account_id() # Determine services to scan services_to_scan = services if services else self.SUPPORTED_SERVICES # Separate global and regional services global_services = [s for s in services_to_scan if s in self.GLOBAL_SERVICES] regional_services = [s for s in services_to_scan if s not in self.GLOBAL_SERVICES] # ACM needs special handling: scan selected regions + us-east-1 (for CloudFront) # but avoid duplicate if us-east-1 is already selected acm_regions = list(regions) # Copy the regions list if 'acm' in regional_services and 'us-east-1' not in acm_regions: acm_regions.append('us-east-1') # Calculate total tasks for progress tracking # Add extra task if ACM needs us-east-1 scan acm_extra_task = 1 if ('acm' in regional_services and 'us-east-1' not in regions) else 0 total_tasks = len(global_services) + (len(regional_services) * len(regions)) + acm_extra_task completed_tasks = 0 # Scan global services first (only once, not per region) if global_services: logger.info(f"Scanning {len(global_services)} global services") global_results = self._scan_services_parallel( account_id=account_id, region='us-east-1', # Global services use us-east-1 services=global_services, is_global=True ) for service, resources in global_results['resources'].items(): for resource in resources: result.add_resource(service, resource) for error in global_results['errors']: result.add_error(**error) completed_tasks += len(global_services) if progress_callback: progress_callback(completed_tasks, total_tasks, "Completed global services scan") # Scan regional services in parallel across regions if regional_services and regions: logger.info(f"Scanning {len(regional_services)} services across {len(regions)} regions") with ThreadPoolExecutor(max_workers=self.MAX_WORKERS) as executor: futures = {} for region in regions: future = executor.submit( self._scan_services_parallel, account_id=account_id, region=region, services=regional_services, is_global=False ) futures[future] = region for future in as_completed(futures): region = futures[future] try: region_results = future.result() for service, resources in region_results['resources'].items(): for resource in resources: result.add_resource(service, resource) for error in region_results['errors']: result.add_error(**error) completed_tasks += len(regional_services) if progress_callback: progress_callback( completed_tasks, total_tasks, f"Completed scan for region: {region}" ) except Exception as e: logger.error(f"Failed to scan region {region}: {str(e)}") result.add_error( service='all', region=region, error=f"Region scan failed: {str(e)}", details=None ) completed_tasks += len(regional_services) # Scan ACM in us-east-1 if not already included in regions if 'acm' in regional_services and 'us-east-1' not in regions: logger.info("Scanning ACM certificates in us-east-1 (for CloudFront)") acm_results = self._scan_services_parallel( account_id=account_id, region='us-east-1', services=['acm'], is_global=False ) for service, resources in acm_results['resources'].items(): for resource in resources: result.add_resource(service, resource) for error in acm_results['errors']: result.add_error(**error) completed_tasks += 1 if progress_callback: progress_callback(completed_tasks, total_tasks, "Completed ACM scan in us-east-1") # Set metadata result.metadata = { 'account_id': account_id, 'regions_scanned': regions, 'services_scanned': services_to_scan, 'total_resources': sum(len(r) for r in result.resources.values()), 'total_errors': len(result.errors) } # Mark as failed if there were critical errors if result.errors and not result.resources: result.success = False return result def _scan_services_parallel( self, account_id: str, region: str, services: List[str], is_global: bool = False ) -> Dict[str, Any]: """ Scan multiple services in parallel within a single region. Args: account_id: AWS account ID region: Region to scan services: List of services to scan is_global: Whether these are global services Returns: Dictionary with 'resources' and 'errors' keys """ resources: Dict[str, List[ResourceData]] = {} errors: List[Dict[str, Any]] = [] # Get session for this region try: session = self.credential_provider.get_session(region_name=region) except CredentialError as e: logger.error(f"Failed to get session for region {region}: {str(e)}") return { 'resources': {}, 'errors': [{ 'service': 'all', 'region': region, 'error': str(e), 'details': None }] } # Scan services in parallel with ThreadPoolExecutor(max_workers=self.MAX_WORKERS) as executor: futures = {} for service in services: scanner_method = self._get_scanner_method(service) if scanner_method: future = executor.submit( self._scan_service_safe, scanner_method=scanner_method, session=session, account_id=account_id, region='global' if is_global else region, service=service ) futures[future] = service for future in as_completed(futures): service = futures[future] try: service_result = future.result() if service_result['resources']: resources[service] = service_result['resources'] if service_result['error']: errors.append(service_result['error']) except Exception as e: logger.error(f"Unexpected error scanning {service}: {str(e)}") errors.append({ 'service': service, 'region': region, 'error': str(e), 'details': None }) return {'resources': resources, 'errors': errors} def _scan_service_safe( self, scanner_method: Callable, session: boto3.Session, account_id: str, region: str, service: str ) -> Dict[str, Any]: """ Safely scan a single service, catching and logging errors. Requirements: - 8.2: Record error details in task record (via error dict) Returns: Dictionary with 'resources' and 'error' keys """ try: resources = scanner_method(session, account_id, region) return {'resources': resources, 'error': None} except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') error_message = e.response.get('Error', {}).get('Message', str(e)) stack_trace = traceback.format_exc() logger.warning(f"Error scanning {service} in {region}: {error_code} - {error_message}") return { 'resources': [], 'error': { 'service': service, 'region': region, 'error': f"{error_code}: {error_message}", 'error_type': 'ClientError', 'details': { 'error_code': error_code, 'stack_trace': stack_trace } } } except Exception as e: stack_trace = traceback.format_exc() logger.error(f"Unexpected error scanning {service} in {region}: {str(e)}") return { 'resources': [], 'error': { 'service': service, 'region': region, 'error': str(e), 'error_type': type(e).__name__, 'details': { 'stack_trace': stack_trace } } } def _get_scanner_method(self, service: str) -> Optional[Callable]: """Get the scanner method for a specific service""" scanner_methods = { 'vpc': self._scan_vpcs, 'subnet': self._scan_subnets, 'route_table': self._scan_route_tables, 'internet_gateway': self._scan_internet_gateways, 'nat_gateway': self._scan_nat_gateways, 'security_group': self._scan_security_groups, 'vpc_endpoint': self._scan_vpc_endpoints, 'vpc_peering': self._scan_vpc_peering, 'customer_gateway': self._scan_customer_gateways, 'virtual_private_gateway': self._scan_virtual_private_gateways, 'vpn_connection': self._scan_vpn_connections, 'ec2': self._scan_ec2_instances, 'elastic_ip': self._scan_elastic_ips, 'autoscaling': self._scan_autoscaling_groups, 'elb': self._scan_load_balancers, 'target_group': self._scan_target_groups, 'rds': self._scan_rds_instances, 'elasticache': self._scan_elasticache_clusters, 'eks': self._scan_eks_clusters, 'lambda': self._scan_lambda_functions, 's3': self._scan_s3_buckets, 's3_event_notification': self._scan_s3_event_notifications, 'cloudfront': self._scan_cloudfront_distributions, 'route53': self._scan_route53_hosted_zones, 'acm': self._scan_acm_certificates, 'waf': self._scan_waf_web_acls, 'sns': self._scan_sns_topics, 'cloudwatch': self._scan_cloudwatch_log_groups, 'eventbridge': self._scan_eventbridge_rules, 'cloudtrail': self._scan_cloudtrail_trails, 'config': self._scan_config_recorders, } return scanner_methods.get(service) # Helper method to get resource name from tags def _get_name_from_tags(self, tags: List[Dict[str, str]], default: str = '') -> str: """Extract Name tag value from tags list""" if not tags: return default for tag in tags: if tag.get('Key') == 'Name': return tag.get('Value', default) return default # ==================== VPC Related Scanners ==================== def _scan_vpcs(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan VPCs""" from app.scanners.services.vpc import VPCServiceScanner return VPCServiceScanner.scan_vpcs(session, account_id, region) def _scan_subnets(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan Subnets""" from app.scanners.services.vpc import VPCServiceScanner return VPCServiceScanner.scan_subnets(session, account_id, region) def _scan_route_tables(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan Route Tables""" from app.scanners.services.vpc import VPCServiceScanner return VPCServiceScanner.scan_route_tables(session, account_id, region) def _scan_internet_gateways(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan Internet Gateways""" from app.scanners.services.vpc import VPCServiceScanner return VPCServiceScanner.scan_internet_gateways(session, account_id, region) def _scan_nat_gateways(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan NAT Gateways""" from app.scanners.services.vpc import VPCServiceScanner return VPCServiceScanner.scan_nat_gateways(session, account_id, region) def _scan_security_groups(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan Security Groups""" from app.scanners.services.vpc import VPCServiceScanner return VPCServiceScanner.scan_security_groups(session, account_id, region) def _scan_vpc_endpoints(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan VPC Endpoints""" from app.scanners.services.vpc import VPCServiceScanner return VPCServiceScanner.scan_vpc_endpoints(session, account_id, region) def _scan_vpc_peering(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan VPC Peering Connections""" from app.scanners.services.vpc import VPCServiceScanner return VPCServiceScanner.scan_vpc_peering(session, account_id, region) def _scan_customer_gateways(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan Customer Gateways""" from app.scanners.services.vpc import VPCServiceScanner return VPCServiceScanner.scan_customer_gateways(session, account_id, region) def _scan_virtual_private_gateways(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan Virtual Private Gateways""" from app.scanners.services.vpc import VPCServiceScanner return VPCServiceScanner.scan_virtual_private_gateways(session, account_id, region) def _scan_vpn_connections(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan VPN Connections""" from app.scanners.services.vpc import VPCServiceScanner return VPCServiceScanner.scan_vpn_connections(session, account_id, region) # ==================== EC2 Related Scanners ==================== def _scan_ec2_instances(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan EC2 Instances""" from app.scanners.services.ec2 import EC2ServiceScanner return EC2ServiceScanner.scan_ec2_instances(session, account_id, region) def _scan_elastic_ips(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan Elastic IPs""" from app.scanners.services.ec2 import EC2ServiceScanner return EC2ServiceScanner.scan_elastic_ips(session, account_id, region) # ==================== Auto Scaling and ELB Scanners ==================== def _scan_autoscaling_groups(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan Auto Scaling Groups""" from app.scanners.services.elb import ELBServiceScanner return ELBServiceScanner.scan_autoscaling_groups(session, account_id, region) def _scan_load_balancers(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan Load Balancers""" from app.scanners.services.elb import ELBServiceScanner return ELBServiceScanner.scan_load_balancers(session, account_id, region) def _scan_target_groups(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan Target Groups""" from app.scanners.services.elb import ELBServiceScanner return ELBServiceScanner.scan_target_groups(session, account_id, region) # ==================== Database Service Scanners ==================== def _scan_rds_instances(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan RDS DB Instances""" from app.scanners.services.database import DatabaseServiceScanner return DatabaseServiceScanner.scan_rds_instances(session, account_id, region) def _scan_elasticache_clusters(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan ElastiCache Clusters""" from app.scanners.services.database import DatabaseServiceScanner return DatabaseServiceScanner.scan_elasticache_clusters(session, account_id, region) # ==================== Compute and Storage Service Scanners ==================== def _scan_eks_clusters(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan EKS Clusters""" from app.scanners.services.compute import ComputeServiceScanner return ComputeServiceScanner.scan_eks_clusters(session, account_id, region) def _scan_lambda_functions(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan Lambda Functions""" from app.scanners.services.compute import ComputeServiceScanner return ComputeServiceScanner.scan_lambda_functions(session, account_id, region) def _scan_s3_buckets(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan S3 Buckets""" from app.scanners.services.compute import ComputeServiceScanner return ComputeServiceScanner.scan_s3_buckets(session, account_id, region) def _scan_s3_event_notifications(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan S3 Event Notifications""" from app.scanners.services.compute import ComputeServiceScanner return ComputeServiceScanner.scan_s3_event_notifications(session, account_id, region) # ==================== Global Service Scanners ==================== def _scan_cloudfront_distributions(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan CloudFront Distributions""" from app.scanners.services.global_services import GlobalServiceScanner return GlobalServiceScanner.scan_cloudfront_distributions(session, account_id, region) def _scan_route53_hosted_zones(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan Route 53 Hosted Zones""" from app.scanners.services.global_services import GlobalServiceScanner return GlobalServiceScanner.scan_route53_hosted_zones(session, account_id, region) def _scan_acm_certificates(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan ACM Certificates""" from app.scanners.services.global_services import GlobalServiceScanner return GlobalServiceScanner.scan_acm_certificates(session, account_id, region) def _scan_waf_web_acls(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan WAF Web ACLs""" from app.scanners.services.global_services import GlobalServiceScanner return GlobalServiceScanner.scan_waf_web_acls(session, account_id, region) # ==================== Monitoring and Management Service Scanners ==================== def _scan_sns_topics(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan SNS Topics""" from app.scanners.services.monitoring import MonitoringServiceScanner return MonitoringServiceScanner.scan_sns_topics(session, account_id, region) def _scan_cloudwatch_log_groups(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan CloudWatch Log Groups""" from app.scanners.services.monitoring import MonitoringServiceScanner return MonitoringServiceScanner.scan_cloudwatch_log_groups(session, account_id, region) def _scan_eventbridge_rules(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan EventBridge Rules""" from app.scanners.services.monitoring import MonitoringServiceScanner return MonitoringServiceScanner.scan_eventbridge_rules(session, account_id, region) def _scan_cloudtrail_trails(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan CloudTrail Trails""" from app.scanners.services.monitoring import MonitoringServiceScanner return MonitoringServiceScanner.scan_cloudtrail_trails(session, account_id, region) def _scan_config_recorders(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan AWS Config Recorders""" from app.scanners.services.monitoring import MonitoringServiceScanner return MonitoringServiceScanner.scan_config_recorders(session, account_id, region)